summaryrefslogtreecommitdiff
path: root/openstackclient/tests/unit/network
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient/tests/unit/network')
-rw-r--r--openstackclient/tests/unit/network/__init__.py0
-rw-r--r--openstackclient/tests/unit/network/test_common.py174
-rw-r--r--openstackclient/tests/unit/network/v2/__init__.py0
-rw-r--r--openstackclient/tests/unit/network/v2/fakes.py1100
-rw-r--r--openstackclient/tests/unit/network/v2/test_address_scope.py399
-rw-r--r--openstackclient/tests/unit/network/v2/test_floating_ip.py565
-rw-r--r--openstackclient/tests/unit/network/v2/test_floating_ip_pool.py97
-rw-r--r--openstackclient/tests/unit/network/v2/test_ip_availability.py172
-rw-r--r--openstackclient/tests/unit/network/v2/test_network.py1059
-rw-r--r--openstackclient/tests/unit/network/v2/test_network_agent.py294
-rw-r--r--openstackclient/tests/unit/network/v2/test_network_rbac.py430
-rw-r--r--openstackclient/tests/unit/network/v2/test_network_segment.py200
-rw-r--r--openstackclient/tests/unit/network/v2/test_port.py696
-rw-r--r--openstackclient/tests/unit/network/v2/test_router.py751
-rw-r--r--openstackclient/tests/unit/network/v2/test_security_group.py770
-rw-r--r--openstackclient/tests/unit/network/v2/test_security_group_rule.py1177
-rw-r--r--openstackclient/tests/unit/network/v2/test_subnet.py978
-rw-r--r--openstackclient/tests/unit/network/v2/test_subnet_pool.py722
18 files changed, 9584 insertions, 0 deletions
diff --git a/openstackclient/tests/unit/network/__init__.py b/openstackclient/tests/unit/network/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/openstackclient/tests/unit/network/__init__.py
diff --git a/openstackclient/tests/unit/network/test_common.py b/openstackclient/tests/unit/network/test_common.py
new file mode 100644
index 00000000..325aad2a
--- /dev/null
+++ b/openstackclient/tests/unit/network/test_common.py
@@ -0,0 +1,174 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import argparse
+import mock
+
+from openstackclient.network import common
+from openstackclient.tests.unit import utils
+
+
+def _add_common_argument(parser):
+ parser.add_argument(
+ 'common',
+ metavar='<common>',
+ help='Common argument',
+ )
+ return parser
+
+
+def _add_network_argument(parser):
+ parser.add_argument(
+ 'network',
+ metavar='<network>',
+ help='Network argument',
+ )
+ return parser
+
+
+def _add_compute_argument(parser):
+ parser.add_argument(
+ 'compute',
+ metavar='<compute>',
+ help='Compute argument',
+ )
+ return parser
+
+
+class FakeNetworkAndComputeCommand(common.NetworkAndComputeCommand):
+
+ def update_parser_common(self, parser):
+ return _add_common_argument(parser)
+
+ def update_parser_network(self, parser):
+ return _add_network_argument(parser)
+
+ def update_parser_compute(self, parser):
+ return _add_compute_argument(parser)
+
+ def take_action_network(self, client, parsed_args):
+ return client.network_action(parsed_args)
+
+ def take_action_compute(self, client, parsed_args):
+ return client.compute_action(parsed_args)
+
+
+class FakeNetworkAndComputeLister(common.NetworkAndComputeLister):
+
+ def update_parser_common(self, parser):
+ return _add_common_argument(parser)
+
+ def update_parser_network(self, parser):
+ return _add_network_argument(parser)
+
+ def update_parser_compute(self, parser):
+ return _add_compute_argument(parser)
+
+ def take_action_network(self, client, parsed_args):
+ return client.network_action(parsed_args)
+
+ def take_action_compute(self, client, parsed_args):
+ return client.compute_action(parsed_args)
+
+
+class FakeNetworkAndComputeShowOne(common.NetworkAndComputeShowOne):
+
+ def update_parser_common(self, parser):
+ return _add_common_argument(parser)
+
+ def update_parser_network(self, parser):
+ return _add_network_argument(parser)
+
+ def update_parser_compute(self, parser):
+ return _add_compute_argument(parser)
+
+ def take_action_network(self, client, parsed_args):
+ return client.network_action(parsed_args)
+
+ def take_action_compute(self, client, parsed_args):
+ return client.compute_action(parsed_args)
+
+
+class TestNetworkAndCompute(utils.TestCommand):
+
+ def setUp(self):
+ super(TestNetworkAndCompute, self).setUp()
+
+ self.namespace = argparse.Namespace()
+
+ # Create network client mocks.
+ self.app.client_manager.network = mock.Mock()
+ self.network = self.app.client_manager.network
+ self.network.network_action = mock.Mock(
+ return_value='take_action_network')
+
+ # Create compute client mocks.
+ self.app.client_manager.compute = mock.Mock()
+ self.compute = self.app.client_manager.compute
+ self.compute.compute_action = mock.Mock(
+ return_value='take_action_compute')
+
+ # Subclasses can override the command object to test.
+ self.cmd = FakeNetworkAndComputeCommand(self.app, self.namespace)
+
+ def test_take_action_network(self):
+ arglist = [
+ 'common',
+ 'network'
+ ]
+ verifylist = [
+ ('common', 'common'),
+ ('network', 'network')
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ self.network.network_action.assert_called_with(parsed_args)
+ self.assertEqual('take_action_network', result)
+
+ def test_take_action_compute(self):
+ arglist = [
+ 'common',
+ 'compute'
+ ]
+ verifylist = [
+ ('common', 'common'),
+ ('compute', 'compute')
+ ]
+
+ self.app.client_manager.network_endpoint_enabled = False
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ self.compute.compute_action.assert_called_with(parsed_args)
+ self.assertEqual('take_action_compute', result)
+
+
+class TestNetworkAndComputeCommand(TestNetworkAndCompute):
+
+ def setUp(self):
+ super(TestNetworkAndComputeCommand, self).setUp()
+ self.cmd = FakeNetworkAndComputeCommand(self.app, self.namespace)
+
+
+class TestNetworkAndComputeLister(TestNetworkAndCompute):
+
+ def setUp(self):
+ super(TestNetworkAndComputeLister, self).setUp()
+ self.cmd = FakeNetworkAndComputeLister(self.app, self.namespace)
+
+
+class TestNetworkAndComputeShowOne(TestNetworkAndCompute):
+
+ def setUp(self):
+ super(TestNetworkAndComputeShowOne, self).setUp()
+ self.cmd = FakeNetworkAndComputeShowOne(self.app, self.namespace)
diff --git a/openstackclient/tests/unit/network/v2/__init__.py b/openstackclient/tests/unit/network/v2/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/__init__.py
diff --git a/openstackclient/tests/unit/network/v2/fakes.py b/openstackclient/tests/unit/network/v2/fakes.py
new file mode 100644
index 00000000..ed30bad3
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/fakes.py
@@ -0,0 +1,1100 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import argparse
+import copy
+import mock
+import uuid
+
+from openstackclient.tests.unit import fakes
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
+from openstackclient.tests.unit import utils
+
+
+QUOTA = {
+ "subnet": 10,
+ "network": 10,
+ "floatingip": 50,
+ "subnetpool": -1,
+ "security_group_rule": 100,
+ "security_group": 10,
+ "router": 10,
+ "rbac_policy": -1,
+ "port": 50,
+ "vip": 10,
+ "member": 10,
+ "health_monitor": 10,
+}
+
+
+class FakeNetworkV2Client(object):
+
+ def __init__(self, **kwargs):
+ self.extensions = mock.Mock()
+ self.extensions.resource_class = fakes.FakeResource(None, {})
+
+
+class TestNetworkV2(utils.TestCommand):
+
+ def setUp(self):
+ super(TestNetworkV2, self).setUp()
+
+ self.namespace = argparse.Namespace()
+
+ self.app.client_manager.session = mock.Mock()
+
+ self.app.client_manager.network = FakeNetworkV2Client(
+ endpoint=fakes.AUTH_URL,
+ token=fakes.AUTH_TOKEN,
+ )
+
+ self.app.client_manager.identity = (
+ identity_fakes_v3.FakeIdentityv3Client(
+ endpoint=fakes.AUTH_URL,
+ token=fakes.AUTH_TOKEN,
+ )
+ )
+
+
+class FakeAddressScope(object):
+ """Fake one or more address scopes."""
+
+ @staticmethod
+ def create_one_address_scope(attrs=None):
+ """Create a fake address scope.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object with name, id, etc.
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ address_scope_attrs = {
+ 'name': 'address-scope-name-' + uuid.uuid4().hex,
+ 'id': 'address-scope-id-' + uuid.uuid4().hex,
+ 'tenant_id': 'project-id-' + uuid.uuid4().hex,
+ 'shared': False,
+ 'ip_version': 4,
+ }
+
+ # Overwrite default attributes.
+ address_scope_attrs.update(attrs)
+
+ address_scope = fakes.FakeResource(
+ info=copy.deepcopy(address_scope_attrs),
+ loaded=True)
+
+ # Set attributes with special mapping in OpenStack SDK.
+ address_scope.project_id = address_scope_attrs['tenant_id']
+
+ return address_scope
+
+ @staticmethod
+ def create_address_scopes(attrs=None, count=2):
+ """Create multiple fake address scopes.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of address scopes to fake
+ :return:
+ A list of FakeResource objects faking the address scopes
+ """
+ address_scopes = []
+ for i in range(0, count):
+ address_scopes.append(
+ FakeAddressScope.create_one_address_scope(attrs))
+
+ return address_scopes
+
+ @staticmethod
+ def get_address_scopes(address_scopes=None, count=2):
+ """Get an iterable MagicMock object with a list of faked address scopes.
+
+ If address scopes list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
+
+ :param List address scopes:
+ A list of FakeResource objects faking address scopes
+ :param int count:
+ The number of address scopes to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ address scopes
+ """
+ if address_scopes is None:
+ address_scopes = FakeAddressScope.create_address_scopes(count)
+ return mock.MagicMock(side_effect=address_scopes)
+
+
+class FakeAvailabilityZone(object):
+ """Fake one or more network availability zones (AZs)."""
+
+ @staticmethod
+ def create_one_availability_zone(attrs=None):
+ """Create a fake AZ.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object with name, state, etc.
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ availability_zone = {
+ 'name': uuid.uuid4().hex,
+ 'state': 'available',
+ 'resource': 'network',
+ }
+
+ # Overwrite default attributes.
+ availability_zone.update(attrs)
+
+ availability_zone = fakes.FakeResource(
+ info=copy.deepcopy(availability_zone),
+ loaded=True)
+ return availability_zone
+
+ @staticmethod
+ def create_availability_zones(attrs=None, count=2):
+ """Create multiple fake AZs.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of AZs to fake
+ :return:
+ A list of FakeResource objects faking the AZs
+ """
+ availability_zones = []
+ for i in range(0, count):
+ availability_zone = \
+ FakeAvailabilityZone.create_one_availability_zone(attrs)
+ availability_zones.append(availability_zone)
+
+ return availability_zones
+
+
+class FakeIPAvailability(object):
+ """Fake one or more network ip availabilities."""
+
+ @staticmethod
+ def create_one_ip_availability():
+ """Create a fake list with ip availability stats of a network.
+
+ :return:
+ A FakeResource object with network_name, network_id, etc.
+ """
+
+ # Set default attributes.
+ network_ip_availability = {
+ 'network_id': 'network-id-' + uuid.uuid4().hex,
+ 'network_name': 'network-name-' + uuid.uuid4().hex,
+ 'tenant_id': '',
+ 'subnet_ip_availability': [],
+ 'total_ips': 254,
+ 'used_ips': 6,
+ }
+
+ network_ip_availability = fakes.FakeResource(
+ info=copy.deepcopy(network_ip_availability),
+ loaded=True)
+ return network_ip_availability
+
+ @staticmethod
+ def create_ip_availability(count=2):
+ """Create fake list of ip availability stats of multiple networks.
+
+ :param int count:
+ The number of networks to fake
+ :return:
+ A list of FakeResource objects faking network ip availability stats
+ """
+ network_ip_availabilities = []
+ for i in range(0, count):
+ network_ip_availability = \
+ FakeIPAvailability.create_one_ip_availability()
+ network_ip_availabilities.append(network_ip_availability)
+
+ return network_ip_availabilities
+
+
+class FakeExtension(object):
+ """Fake one or more extension."""
+
+ @staticmethod
+ def create_one_extension(attrs=None):
+ """Create a fake extension.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object with name, namespace, etc.
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ extension_info = {
+ 'name': 'name-' + uuid.uuid4().hex,
+ 'namespace': 'http://docs.openstack.org/network/',
+ 'description': 'description-' + uuid.uuid4().hex,
+ 'updated': '2013-07-09T12:00:0-00:00',
+ 'alias': 'Dystopian',
+ 'links': '[{"href":''"https://github.com/os/network", "type"}]',
+ }
+
+ # Overwrite default attributes.
+ extension_info.update(attrs)
+
+ extension = fakes.FakeResource(
+ info=copy.deepcopy(extension_info),
+ loaded=True)
+ return extension
+
+
+class FakeNetwork(object):
+ """Fake one or more networks."""
+
+ @staticmethod
+ def create_one_network(attrs=None):
+ """Create a fake network.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object, with id, name, etc.
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ network_attrs = {
+ 'id': 'network-id-' + uuid.uuid4().hex,
+ 'name': 'network-name-' + uuid.uuid4().hex,
+ 'status': 'ACTIVE',
+ 'tenant_id': 'project-id-' + uuid.uuid4().hex,
+ 'admin_state_up': True,
+ 'shared': False,
+ 'subnets': ['a', 'b'],
+ 'provider_network_type': 'vlan',
+ 'router:external': True,
+ 'availability_zones': [],
+ 'availability_zone_hints': [],
+ 'is_default': False,
+ 'port_security_enabled': True,
+ }
+
+ # Overwrite default attributes.
+ network_attrs.update(attrs)
+
+ network = fakes.FakeResource(info=copy.deepcopy(network_attrs),
+ loaded=True)
+
+ # Set attributes with special mapping in OpenStack SDK.
+ network.project_id = network_attrs['tenant_id']
+ network.is_router_external = network_attrs['router:external']
+ network.is_port_security_enabled = \
+ network_attrs['port_security_enabled']
+
+ return network
+
+ @staticmethod
+ def create_networks(attrs=None, count=2):
+ """Create multiple fake networks.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of networks to fake
+ :return:
+ A list of FakeResource objects faking the networks
+ """
+ networks = []
+ for i in range(0, count):
+ networks.append(FakeNetwork.create_one_network(attrs))
+
+ return networks
+
+ @staticmethod
+ def get_networks(networks=None, count=2):
+ """Get an iterable MagicMock object with a list of faked networks.
+
+ If networks list is provided, then initialize the Mock object with the
+ list. Otherwise create one.
+
+ :param List networks:
+ A list of FakeResource objects faking networks
+ :param int count:
+ The number of networks to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ networks
+ """
+ if networks is None:
+ networks = FakeNetwork.create_networks(count)
+ return mock.MagicMock(side_effect=networks)
+
+
+class FakeNetworkSegment(object):
+ """Fake one or more network segments."""
+
+ @staticmethod
+ def create_one_network_segment(attrs=None):
+ """Create a fake network segment.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object faking the network segment
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ network_segment_attrs = {
+ 'id': 'network-segment-id-' + uuid.uuid4().hex,
+ 'network_id': 'network-id-' + uuid.uuid4().hex,
+ 'network_type': 'vlan',
+ 'physical_network': 'physical-network-name-' + uuid.uuid4().hex,
+ 'segmentation_id': 1024,
+ }
+
+ # Overwrite default attributes.
+ network_segment_attrs.update(attrs)
+
+ network_segment = fakes.FakeResource(
+ info=copy.deepcopy(network_segment_attrs),
+ loaded=True
+ )
+
+ return network_segment
+
+ @staticmethod
+ def create_network_segments(attrs=None, count=2):
+ """Create multiple fake network segments.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of network segments to fake
+ :return:
+ A list of FakeResource objects faking the network segments
+ """
+ network_segments = []
+ for i in range(0, count):
+ network_segments.append(
+ FakeNetworkSegment.create_one_network_segment(attrs)
+ )
+ return network_segments
+
+
+class FakePort(object):
+ """Fake one or more ports."""
+
+ @staticmethod
+ def create_one_port(attrs=None):
+ """Create a fake port.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object, with id, name, etc.
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ port_attrs = {
+ 'admin_state_up': True,
+ 'allowed_address_pairs': [{}],
+ 'binding:host_id': 'binding-host-id-' + uuid.uuid4().hex,
+ 'binding:profile': {},
+ 'binding:vif_details': {},
+ 'binding:vif_type': 'ovs',
+ 'binding:vnic_type': 'normal',
+ 'device_id': 'device-id-' + uuid.uuid4().hex,
+ 'device_owner': 'compute:nova',
+ 'dns_assignment': [{}],
+ 'dns_name': 'dns-name-' + uuid.uuid4().hex,
+ 'extra_dhcp_opts': [{}],
+ 'fixed_ips': [{}],
+ 'id': 'port-id-' + uuid.uuid4().hex,
+ 'mac_address': 'fa:16:3e:a9:4e:72',
+ 'name': 'port-name-' + uuid.uuid4().hex,
+ 'network_id': 'network-id-' + uuid.uuid4().hex,
+ 'port_security_enabled': True,
+ 'security_groups': [],
+ 'status': 'ACTIVE',
+ 'tenant_id': 'project-id-' + uuid.uuid4().hex,
+ }
+
+ # Overwrite default attributes.
+ port_attrs.update(attrs)
+
+ port = fakes.FakeResource(info=copy.deepcopy(port_attrs),
+ loaded=True)
+
+ # Set attributes with special mappings in OpenStack SDK.
+ port.project_id = port_attrs['tenant_id']
+ port.binding_host_id = port_attrs['binding:host_id']
+ port.binding_profile = port_attrs['binding:profile']
+ port.binding_vif_details = port_attrs['binding:vif_details']
+ port.binding_vif_type = port_attrs['binding:vif_type']
+ port.binding_vnic_type = port_attrs['binding:vnic_type']
+
+ return port
+
+ @staticmethod
+ def create_ports(attrs=None, count=2):
+ """Create multiple fake ports.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of ports to fake
+ :return:
+ A list of FakeResource objects faking the ports
+ """
+ ports = []
+ for i in range(0, count):
+ ports.append(FakePort.create_one_port(attrs))
+
+ return ports
+
+ @staticmethod
+ def get_ports(ports=None, count=2):
+ """Get an iterable MagicMock object with a list of faked ports.
+
+ If ports list is provided, then initialize the Mock object with the
+ list. Otherwise create one.
+
+ :param List ports:
+ A list of FakeResource objects faking ports
+ :param int count:
+ The number of ports to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ ports
+ """
+ if ports is None:
+ ports = FakePort.create_ports(count)
+ return mock.MagicMock(side_effect=ports)
+
+
+class FakeNetworkAgent(object):
+ """Fake one or more network agents."""
+
+ @staticmethod
+ def create_one_network_agent(attrs=None):
+ """Create a fake network agent
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object, with id, agent_type, and so on.
+ """
+ attrs = attrs or {}
+
+ # Set default attributes
+ agent_attrs = {
+ 'id': 'agent-id-' + uuid.uuid4().hex,
+ 'agent_type': 'agent-type-' + uuid.uuid4().hex,
+ 'host': 'host-' + uuid.uuid4().hex,
+ 'availability_zone': 'zone-' + uuid.uuid4().hex,
+ 'alive': True,
+ 'admin_state_up': True,
+ 'binary': 'binary-' + uuid.uuid4().hex,
+ 'configurations': {'subnet': 2, 'networks': 1},
+ }
+ agent_attrs.update(attrs)
+ agent = fakes.FakeResource(info=copy.deepcopy(agent_attrs),
+ loaded=True)
+ return agent
+
+ @staticmethod
+ def create_network_agents(attrs=None, count=2):
+ """Create multiple fake network agents.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of network agents to fake
+ :return:
+ A list of FakeResource objects faking the network agents
+ """
+ agents = []
+ for i in range(0, count):
+ agents.append(FakeNetworkAgent.create_one_network_agent(attrs))
+
+ return agents
+
+ @staticmethod
+ def get_network_agents(agents=None, count=2):
+ """Get an iterable MagicMock object with a list of faked network agents.
+
+ If network agents list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
+
+ :param List agents:
+ A list of FakeResource objects faking network agents
+ :param int count:
+ The number of network agents to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ network agents
+ """
+ if agents is None:
+ agents = FakeNetworkAgent.create_network_agents(count)
+ return mock.MagicMock(side_effect=agents)
+
+
+class FakeNetworkRBAC(object):
+ """Fake one or more network rbac policies."""
+
+ @staticmethod
+ def create_one_network_rbac(attrs=None):
+ """Create a fake network rbac
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object, with id, action, target_tenant,
+ tenant_id, type
+ """
+ attrs = attrs or {}
+
+ # Set default attributes
+ rbac_attrs = {
+ 'id': 'rbac-id-' + uuid.uuid4().hex,
+ 'object_type': 'network',
+ 'object_id': 'object-id-' + uuid.uuid4().hex,
+ 'action': 'access_as_shared',
+ 'target_tenant': 'target-tenant-' + uuid.uuid4().hex,
+ 'tenant_id': 'tenant-id-' + uuid.uuid4().hex,
+ }
+ rbac_attrs.update(attrs)
+ rbac = fakes.FakeResource(info=copy.deepcopy(rbac_attrs),
+ loaded=True)
+ # Set attributes with special mapping in OpenStack SDK.
+ rbac.project_id = rbac_attrs['tenant_id']
+ rbac.target_project_id = rbac_attrs['target_tenant']
+ return rbac
+
+ @staticmethod
+ def create_network_rbacs(attrs=None, count=2):
+ """Create multiple fake network rbac policies.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of rbac policies to fake
+ :return:
+ A list of FakeResource objects faking the rbac policies
+ """
+ rbac_policies = []
+ for i in range(0, count):
+ rbac_policies.append(FakeNetworkRBAC.
+ create_one_network_rbac(attrs))
+
+ return rbac_policies
+
+ @staticmethod
+ def get_network_rbacs(rbac_policies=None, count=2):
+ """Get an iterable MagicMock object with a list of faked rbac policies.
+
+ If rbac policies list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
+
+ :param List rbac_policies:
+ A list of FakeResource objects faking rbac policies
+ :param int count:
+ The number of rbac policies to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ rbac policies
+ """
+ if rbac_policies is None:
+ rbac_policies = FakeNetworkRBAC.create_network_rbacs(count)
+ return mock.MagicMock(side_effect=rbac_policies)
+
+
+class FakeRouter(object):
+ """Fake one or more routers."""
+
+ @staticmethod
+ def create_one_router(attrs=None):
+ """Create a fake router.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object, with id, name, admin_state_up,
+ status, tenant_id
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ router_attrs = {
+ 'id': 'router-id-' + uuid.uuid4().hex,
+ 'name': 'router-name-' + uuid.uuid4().hex,
+ 'status': 'ACTIVE',
+ 'admin_state_up': True,
+ 'distributed': False,
+ 'ha': False,
+ 'tenant_id': 'project-id-' + uuid.uuid4().hex,
+ 'routes': [],
+ 'external_gateway_info': {},
+ 'availability_zone_hints': [],
+ 'availability_zones': [],
+ }
+
+ # Overwrite default attributes.
+ router_attrs.update(attrs)
+
+ router = fakes.FakeResource(info=copy.deepcopy(router_attrs),
+ loaded=True)
+
+ # Set attributes with special mapping in OpenStack SDK.
+ router.project_id = router_attrs['tenant_id']
+
+ return router
+
+ @staticmethod
+ def create_routers(attrs=None, count=2):
+ """Create multiple fake routers.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of routers to fake
+ :return:
+ A list of FakeResource objects faking the routers
+ """
+ routers = []
+ for i in range(0, count):
+ routers.append(FakeRouter.create_one_router(attrs))
+
+ return routers
+
+ @staticmethod
+ def get_routers(routers=None, count=2):
+ """Get an iterable MagicMock object with a list of faked routers.
+
+ If routers list is provided, then initialize the Mock object with the
+ list. Otherwise create one.
+
+ :param List routers:
+ A list of FakeResource objects faking routers
+ :param int count:
+ The number of routers to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ routers
+ """
+ if routers is None:
+ routers = FakeRouter.create_routers(count)
+ return mock.MagicMock(side_effect=routers)
+
+
+class FakeSecurityGroup(object):
+ """Fake one or more security groups."""
+
+ @staticmethod
+ def create_one_security_group(attrs=None):
+ """Create a fake security group.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object, with id, name, etc.
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ security_group_attrs = {
+ 'id': 'security-group-id-' + uuid.uuid4().hex,
+ 'name': 'security-group-name-' + uuid.uuid4().hex,
+ 'description': 'security-group-description-' + uuid.uuid4().hex,
+ 'tenant_id': 'project-id-' + uuid.uuid4().hex,
+ 'security_group_rules': [],
+ }
+
+ # Overwrite default attributes.
+ security_group_attrs.update(attrs)
+
+ security_group = fakes.FakeResource(
+ info=copy.deepcopy(security_group_attrs),
+ loaded=True)
+
+ # Set attributes with special mapping in OpenStack SDK.
+ security_group.project_id = security_group_attrs['tenant_id']
+
+ return security_group
+
+ @staticmethod
+ def create_security_groups(attrs=None, count=2):
+ """Create multiple fake security groups.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of security groups to fake
+ :return:
+ A list of FakeResource objects faking the security groups
+ """
+ security_groups = []
+ for i in range(0, count):
+ security_groups.append(
+ FakeSecurityGroup.create_one_security_group(attrs))
+
+ return security_groups
+
+ @staticmethod
+ def get_security_groups(security_groups=None, count=2):
+ """Get an iterable MagicMock object with a list of faked security groups.
+
+ If security groups list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
+
+ :param List security groups:
+ A list of FakeResource objects faking security groups
+ :param int count:
+ The number of security groups to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ security groups
+ """
+ if security_groups is None:
+ security_groups = FakeSecurityGroup.create_security_groups(count)
+ return mock.MagicMock(side_effect=security_groups)
+
+
+class FakeSecurityGroupRule(object):
+ """Fake one or more security group rules."""
+
+ @staticmethod
+ def create_one_security_group_rule(attrs=None):
+ """Create a fake security group rule.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object, with id, etc.
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ security_group_rule_attrs = {
+ 'direction': 'ingress',
+ 'ethertype': 'IPv4',
+ 'id': 'security-group-rule-id-' + uuid.uuid4().hex,
+ 'port_range_max': None,
+ 'port_range_min': None,
+ 'protocol': 'tcp',
+ 'remote_group_id': None,
+ 'remote_ip_prefix': '0.0.0.0/0',
+ 'security_group_id': 'security-group-id-' + uuid.uuid4().hex,
+ 'tenant_id': 'project-id-' + uuid.uuid4().hex,
+ }
+
+ # Overwrite default attributes.
+ security_group_rule_attrs.update(attrs)
+
+ security_group_rule = fakes.FakeResource(
+ info=copy.deepcopy(security_group_rule_attrs),
+ loaded=True)
+
+ # Set attributes with special mapping in OpenStack SDK.
+ security_group_rule.project_id = security_group_rule_attrs['tenant_id']
+
+ return security_group_rule
+
+ @staticmethod
+ def create_security_group_rules(attrs=None, count=2):
+ """Create multiple fake security group rules.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of security group rules to fake
+ :return:
+ A list of FakeResource objects faking the security group rules
+ """
+ security_group_rules = []
+ for i in range(0, count):
+ security_group_rules.append(
+ FakeSecurityGroupRule.create_one_security_group_rule(attrs))
+
+ return security_group_rules
+
+ @staticmethod
+ def get_security_group_rules(security_group_rules=None, count=2):
+ """Get an iterable MagicMock object with a list of faked security group rules.
+
+ If security group rules list is provided, then initialize the Mock
+ object with the list. Otherwise create one.
+
+ :param List security group rules:
+ A list of FakeResource objects faking security group rules
+ :param int count:
+ The number of security group rules to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ security group rules
+ """
+ if security_group_rules is None:
+ security_group_rules = (
+ FakeSecurityGroupRule.create_security_group_rules(count))
+ return mock.MagicMock(side_effect=security_group_rules)
+
+
+class FakeSubnet(object):
+ """Fake one or more subnets."""
+
+ @staticmethod
+ def create_one_subnet(attrs=None):
+ """Create a fake subnet.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object faking the subnet
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ project_id = 'project-id-' + uuid.uuid4().hex
+ subnet_attrs = {
+ 'id': 'subnet-id-' + uuid.uuid4().hex,
+ 'name': 'subnet-name-' + uuid.uuid4().hex,
+ 'network_id': 'network-id-' + uuid.uuid4().hex,
+ 'cidr': '10.10.10.0/24',
+ 'tenant_id': project_id,
+ 'enable_dhcp': True,
+ 'dns_nameservers': [],
+ 'allocation_pools': [],
+ 'host_routes': [],
+ 'ip_version': 4,
+ 'gateway_ip': '10.10.10.1',
+ 'ipv6_address_mode': None,
+ 'ipv6_ra_mode': None,
+ 'segment_id': None,
+ 'service_types': [],
+ 'subnetpool_id': None,
+ }
+
+ # Overwrite default attributes.
+ subnet_attrs.update(attrs)
+
+ subnet = fakes.FakeResource(info=copy.deepcopy(subnet_attrs),
+ loaded=True)
+
+ # Set attributes with special mappings in OpenStack SDK.
+ subnet.project_id = subnet_attrs['tenant_id']
+
+ return subnet
+
+ @staticmethod
+ def create_subnets(attrs=None, count=2):
+ """Create multiple fake subnets.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of subnets to fake
+ :return:
+ A list of FakeResource objects faking the subnets
+ """
+ subnets = []
+ for i in range(0, count):
+ subnets.append(FakeSubnet.create_one_subnet(attrs))
+
+ return subnets
+
+ @staticmethod
+ def get_subnets(subnets=None, count=2):
+ """Get an iterable MagicMock object with a list of faked subnets.
+
+ If subnets list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
+
+ :param List subnets:
+ A list of FakeResource objects faking subnets
+ :param int count:
+ The number of subnets to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ subnets
+ """
+ if subnets is None:
+ subnets = FakeSubnet.create_subnets(count)
+ return mock.MagicMock(side_effect=subnets)
+
+
+class FakeFloatingIP(object):
+ """Fake one or more floating ip."""
+
+ @staticmethod
+ def create_one_floating_ip(attrs=None):
+ """Create a fake floating ip.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object, with id, ip, and so on
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ floating_ip_attrs = {
+ 'id': 'floating-ip-id-' + uuid.uuid4().hex,
+ 'floating_ip_address': '1.0.9.0',
+ 'fixed_ip_address': '2.0.9.0',
+ 'dns_domain': None,
+ 'dns_name': None,
+ 'status': 'DOWN',
+ 'floating_network_id': 'network-id-' + uuid.uuid4().hex,
+ 'router_id': 'router-id-' + uuid.uuid4().hex,
+ 'port_id': 'port-id-' + uuid.uuid4().hex,
+ 'tenant_id': 'project-id-' + uuid.uuid4().hex,
+ }
+
+ # Overwrite default attributes.
+ floating_ip_attrs.update(attrs)
+
+ floating_ip = fakes.FakeResource(
+ info=copy.deepcopy(floating_ip_attrs),
+ loaded=True
+ )
+
+ # Set attributes with special mappings in OpenStack SDK.
+ floating_ip.project_id = floating_ip_attrs['tenant_id']
+
+ return floating_ip
+
+ @staticmethod
+ def create_floating_ips(attrs=None, count=2):
+ """Create multiple fake floating ips.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of floating ips to fake
+ :return:
+ A list of FakeResource objects faking the floating ips
+ """
+ floating_ips = []
+ for i in range(0, count):
+ floating_ips.append(FakeFloatingIP.create_one_floating_ip(attrs))
+ return floating_ips
+
+ @staticmethod
+ def get_floating_ips(floating_ips=None, count=2):
+ """Get an iterable MagicMock object with a list of faked floating ips.
+
+ If floating_ips list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
+
+ :param List floating ips:
+ A list of FakeResource objects faking floating ips
+ :param int count:
+ The number of floating ips to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ floating ips
+ """
+ if floating_ips is None:
+ floating_ips = FakeFloatingIP.create_floating_ips(count)
+ return mock.MagicMock(side_effect=floating_ips)
+
+
+class FakeSubnetPool(object):
+ """Fake one or more subnet pools."""
+
+ @staticmethod
+ def create_one_subnet_pool(attrs=None):
+ """Create a fake subnet pool.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object faking the subnet pool
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ subnet_pool_attrs = {
+ 'id': 'subnet-pool-id-' + uuid.uuid4().hex,
+ 'name': 'subnet-pool-name-' + uuid.uuid4().hex,
+ 'prefixes': ['10.0.0.0/24', '10.1.0.0/24'],
+ 'default_prefixlen': '8',
+ 'address_scope_id': 'address-scope-id-' + uuid.uuid4().hex,
+ 'tenant_id': 'project-id-' + uuid.uuid4().hex,
+ 'is_default': False,
+ 'shared': False,
+ 'max_prefixlen': '32',
+ 'min_prefixlen': '8',
+ 'default_quota': None,
+ 'ip_version': '4',
+ }
+
+ # Overwrite default attributes.
+ subnet_pool_attrs.update(attrs)
+
+ subnet_pool = fakes.FakeResource(
+ info=copy.deepcopy(subnet_pool_attrs),
+ loaded=True
+ )
+
+ # Set attributes with special mapping in OpenStack SDK.
+ subnet_pool.project_id = subnet_pool_attrs['tenant_id']
+
+ return subnet_pool
+
+ @staticmethod
+ def create_subnet_pools(attrs=None, count=2):
+ """Create multiple fake subnet pools.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :param int count:
+ The number of subnet pools to fake
+ :return:
+ A list of FakeResource objects faking the subnet pools
+ """
+ subnet_pools = []
+ for i in range(0, count):
+ subnet_pools.append(
+ FakeSubnetPool.create_one_subnet_pool(attrs)
+ )
+
+ return subnet_pools
+
+ @staticmethod
+ def get_subnet_pools(subnet_pools=None, count=2):
+ """Get an iterable MagicMock object with a list of faked subnet pools.
+
+ If subnet_pools list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
+
+ :param List subnet pools:
+ A list of FakeResource objects faking subnet pools
+ :param int count:
+ The number of subnet pools to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ subnet pools
+ """
+ if subnet_pools is None:
+ subnet_pools = FakeSubnetPool.create_subnet_pools(count)
+ return mock.MagicMock(side_effect=subnet_pools)
diff --git a/openstackclient/tests/unit/network/v2/test_address_scope.py b/openstackclient/tests/unit/network/v2/test_address_scope.py
new file mode 100644
index 00000000..6d3f4011
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_address_scope.py
@@ -0,0 +1,399 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+from mock import call
+
+from osc_lib import exceptions
+
+from openstackclient.network.v2 import address_scope
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestAddressScope(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestAddressScope, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+ # Get a shortcut to the ProjectManager Mock
+ self.projects_mock = self.app.client_manager.identity.projects
+ # Get a shortcut to the DomainManager Mock
+ self.domains_mock = self.app.client_manager.identity.domains
+
+
+class TestCreateAddressScope(TestAddressScope):
+
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ domain = identity_fakes_v3.FakeDomain.create_one_domain()
+ # The new address scope created.
+ new_address_scope = (
+ network_fakes.FakeAddressScope.create_one_address_scope(
+ attrs={
+ 'tenant_id': project.id,
+ }
+ ))
+ columns = (
+ 'id',
+ 'ip_version',
+ 'name',
+ 'project_id',
+ 'shared'
+ )
+ data = (
+ new_address_scope.id,
+ new_address_scope.ip_version,
+ new_address_scope.name,
+ new_address_scope.project_id,
+ new_address_scope.shared,
+ )
+
+ def setUp(self):
+ super(TestCreateAddressScope, self).setUp()
+ self.network.create_address_scope = mock.Mock(
+ return_value=self.new_address_scope)
+
+ # Get the command object to test
+ self.cmd = address_scope.CreateAddressScope(self.app, self.namespace)
+
+ self.projects_mock.get.return_value = self.project
+ self.domains_mock.get.return_value = self.domain
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ self.new_address_scope.name,
+ ]
+ verifylist = [
+ ('project', None),
+ ('ip_version', self.new_address_scope.ip_version),
+ ('name', self.new_address_scope.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_address_scope.assert_called_once_with(**{
+ 'ip_version': self.new_address_scope.ip_version,
+ 'name': self.new_address_scope.name,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_all_options(self):
+ arglist = [
+ '--ip-version', str(self.new_address_scope.ip_version),
+ '--share',
+ '--project', self.project.name,
+ '--project-domain', self.domain.name,
+ self.new_address_scope.name,
+ ]
+ verifylist = [
+ ('ip_version', self.new_address_scope.ip_version),
+ ('share', True),
+ ('project', self.project.name),
+ ('project_domain', self.domain.name),
+ ('name', self.new_address_scope.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_address_scope.assert_called_once_with(**{
+ 'ip_version': self.new_address_scope.ip_version,
+ 'shared': True,
+ 'tenant_id': self.project.id,
+ 'name': self.new_address_scope.name,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_no_share(self):
+ arglist = [
+ '--no-share',
+ self.new_address_scope.name,
+ ]
+ verifylist = [
+ ('no_share', True),
+ ('name', self.new_address_scope.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_address_scope.assert_called_once_with(**{
+ 'ip_version': self.new_address_scope.ip_version,
+ 'shared': False,
+ 'name': self.new_address_scope.name,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteAddressScope(TestAddressScope):
+
+ # The address scope to delete.
+ _address_scopes = (
+ network_fakes.FakeAddressScope.create_address_scopes(count=2))
+
+ def setUp(self):
+ super(TestDeleteAddressScope, self).setUp()
+ self.network.delete_address_scope = mock.Mock(return_value=None)
+ self.network.find_address_scope = (
+ network_fakes.FakeAddressScope.get_address_scopes(
+ address_scopes=self._address_scopes)
+ )
+
+ # Get the command object to test
+ self.cmd = address_scope.DeleteAddressScope(self.app, self.namespace)
+
+ def test_address_scope_delete(self):
+ arglist = [
+ self._address_scopes[0].name,
+ ]
+ verifylist = [
+ ('address_scope', [self._address_scopes[0].name]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_address_scope.assert_called_once_with(
+ self._address_scopes[0].name, ignore_missing=False)
+ self.network.delete_address_scope.assert_called_once_with(
+ self._address_scopes[0])
+ self.assertIsNone(result)
+
+ def test_multi_address_scopes_delete(self):
+ arglist = []
+ verifylist = []
+
+ for a in self._address_scopes:
+ arglist.append(a.name)
+ verifylist = [
+ ('address_scope', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for a in self._address_scopes:
+ calls.append(call(a))
+ self.network.delete_address_scope.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_address_scopes_delete_with_exception(self):
+ arglist = [
+ self._address_scopes[0].name,
+ 'unexist_address_scope',
+ ]
+ verifylist = [
+ ('address_scope',
+ [self._address_scopes[0].name, 'unexist_address_scope']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self._address_scopes[0], exceptions.CommandError]
+ self.network.find_address_scope = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 address scopes failed to delete.', str(e))
+
+ self.network.find_address_scope.assert_any_call(
+ self._address_scopes[0].name, ignore_missing=False)
+ self.network.find_address_scope.assert_any_call(
+ 'unexist_address_scope', ignore_missing=False)
+ self.network.delete_address_scope.assert_called_once_with(
+ self._address_scopes[0]
+ )
+
+
+class TestListAddressScope(TestAddressScope):
+
+ # The address scopes to list up.
+ address_scopes = (
+ network_fakes.FakeAddressScope.create_address_scopes(count=3))
+ columns = (
+ 'ID',
+ 'Name',
+ 'IP Version',
+ 'Shared',
+ 'Project',
+ )
+ data = []
+ for scope in address_scopes:
+ data.append((
+ scope.id,
+ scope.name,
+ scope.ip_version,
+ scope.shared,
+ scope.project_id,
+ ))
+
+ def setUp(self):
+ super(TestListAddressScope, self).setUp()
+ self.network.address_scopes = mock.Mock(
+ return_value=self.address_scopes)
+
+ # Get the command object to test
+ self.cmd = address_scope.ListAddressScope(self.app, self.namespace)
+
+ def test_address_scope_list(self):
+ arglist = []
+ verifylist = []
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.address_scopes.assert_called_once_with(**{})
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestSetAddressScope(TestAddressScope):
+
+ # The address scope to set.
+ _address_scope = network_fakes.FakeAddressScope.create_one_address_scope()
+
+ def setUp(self):
+ super(TestSetAddressScope, self).setUp()
+ self.network.update_address_scope = mock.Mock(return_value=None)
+ self.network.find_address_scope = mock.Mock(
+ return_value=self._address_scope)
+
+ # Get the command object to test
+ self.cmd = address_scope.SetAddressScope(self.app, self.namespace)
+
+ def test_set_nothing(self):
+ arglist = [self._address_scope.name, ]
+ verifylist = [
+ ('address_scope', self._address_scope.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {}
+ self.network.update_address_scope.assert_called_with(
+ self._address_scope, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_name_and_share(self):
+ arglist = [
+ '--name', 'new_address_scope',
+ '--share',
+ self._address_scope.name,
+ ]
+ verifylist = [
+ ('name', 'new_address_scope'),
+ ('share', True),
+ ('address_scope', self._address_scope.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ attrs = {
+ 'name': "new_address_scope",
+ 'shared': True,
+ }
+ self.network.update_address_scope.assert_called_with(
+ self._address_scope, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_no_share(self):
+ arglist = [
+ '--no-share',
+ self._address_scope.name,
+ ]
+ verifylist = [
+ ('no_share', True),
+ ('address_scope', self._address_scope.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ attrs = {
+ 'shared': False,
+ }
+ self.network.update_address_scope.assert_called_with(
+ self._address_scope, **attrs)
+ self.assertIsNone(result)
+
+
+class TestShowAddressScope(TestAddressScope):
+
+ # The address scope to show.
+ _address_scope = (
+ network_fakes.FakeAddressScope.create_one_address_scope())
+ columns = (
+ 'id',
+ 'ip_version',
+ 'name',
+ 'project_id',
+ 'shared',
+ )
+ data = (
+ _address_scope.id,
+ _address_scope.ip_version,
+ _address_scope.name,
+ _address_scope.project_id,
+ _address_scope.shared,
+ )
+
+ def setUp(self):
+ super(TestShowAddressScope, self).setUp()
+ self.network.find_address_scope = mock.Mock(
+ return_value=self._address_scope)
+
+ # Get the command object to test
+ self.cmd = address_scope.ShowAddressScope(self.app, self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self._address_scope.name,
+ ]
+ verifylist = [
+ ('address_scope', self._address_scope.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_address_scope.assert_called_once_with(
+ self._address_scope.name, ignore_missing=False)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(list(self.data), list(data))
diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip.py b/openstackclient/tests/unit/network/v2/test_floating_ip.py
new file mode 100644
index 00000000..a40e48f4
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_floating_ip.py
@@ -0,0 +1,565 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+from mock import call
+
+from osc_lib import exceptions
+
+from openstackclient.network.v2 import floating_ip
+from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+# Tests for Neutron network
+#
+class TestFloatingIPNetwork(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestFloatingIPNetwork, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+
+
+class TestCreateFloatingIPNetwork(TestFloatingIPNetwork):
+
+ # Fake data for option tests.
+ floating_network = network_fakes.FakeNetwork.create_one_network()
+ subnet = network_fakes.FakeSubnet.create_one_subnet()
+ port = network_fakes.FakePort.create_one_port()
+
+ # The floating ip to be deleted.
+ floating_ip = network_fakes.FakeFloatingIP.create_one_floating_ip(
+ attrs={
+ 'floating_network_id': floating_network.id,
+ 'port_id': port.id,
+ }
+ )
+
+ columns = (
+ 'dns_domain',
+ 'dns_name',
+ 'fixed_ip_address',
+ 'floating_ip_address',
+ 'floating_network_id',
+ 'id',
+ 'port_id',
+ 'project_id',
+ 'router_id',
+ 'status',
+ )
+
+ data = (
+ floating_ip.dns_domain,
+ floating_ip.dns_name,
+ floating_ip.fixed_ip_address,
+ floating_ip.floating_ip_address,
+ floating_ip.floating_network_id,
+ floating_ip.id,
+ floating_ip.port_id,
+ floating_ip.project_id,
+ floating_ip.router_id,
+ floating_ip.status,
+ )
+
+ def setUp(self):
+ super(TestCreateFloatingIPNetwork, self).setUp()
+
+ self.network.create_ip = mock.Mock(return_value=self.floating_ip)
+
+ self.network.find_network = mock.Mock(
+ return_value=self.floating_network)
+ self.network.find_subnet = mock.Mock(return_value=self.subnet)
+ self.network.find_port = mock.Mock(return_value=self.port)
+
+ # Get the command object to test
+ self.cmd = floating_ip.CreateFloatingIP(self.app, self.namespace)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ self.floating_ip.floating_network_id,
+ ]
+ verifylist = [
+ ('network', self.floating_ip.floating_network_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_ip.assert_called_once_with(**{
+ 'floating_network_id': self.floating_ip.floating_network_id,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_all_options(self):
+ arglist = [
+ '--subnet', self.subnet.id,
+ '--port', self.floating_ip.port_id,
+ '--floating-ip-address', self.floating_ip.floating_ip_address,
+ '--fixed-ip-address', self.floating_ip.fixed_ip_address,
+ self.floating_ip.floating_network_id,
+ ]
+ verifylist = [
+ ('subnet', self.subnet.id),
+ ('port', self.floating_ip.port_id),
+ ('floating_ip_address', self.floating_ip.floating_ip_address),
+ ('fixed_ip_address', self.floating_ip.fixed_ip_address),
+ ('network', self.floating_ip.floating_network_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_ip.assert_called_once_with(**{
+ 'subnet_id': self.subnet.id,
+ 'port_id': self.floating_ip.port_id,
+ 'floating_ip_address': self.floating_ip.floating_ip_address,
+ 'fixed_ip_address': self.floating_ip.fixed_ip_address,
+ 'floating_network_id': self.floating_ip.floating_network_id,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork):
+
+ # The floating ips to be deleted.
+ floating_ips = network_fakes.FakeFloatingIP.create_floating_ips(count=2)
+
+ def setUp(self):
+ super(TestDeleteFloatingIPNetwork, self).setUp()
+
+ self.network.delete_ip = mock.Mock(return_value=None)
+ self.network.find_ip = (
+ network_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips))
+
+ # Get the command object to test
+ self.cmd = floating_ip.DeleteFloatingIP(self.app, self.namespace)
+
+ def test_floating_ip_delete(self):
+ arglist = [
+ self.floating_ips[0].id,
+ ]
+ verifylist = [
+ ('floating_ip', [self.floating_ips[0].id]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.find_ip.assert_called_once_with(
+ self.floating_ips[0].id, ignore_missing=False)
+ self.network.delete_ip.assert_called_once_with(self.floating_ips[0])
+ self.assertIsNone(result)
+
+ def test_multi_floating_ips_delete(self):
+ arglist = []
+ verifylist = []
+
+ for f in self.floating_ips:
+ arglist.append(f.id)
+ verifylist = [
+ ('floating_ip', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for f in self.floating_ips:
+ calls.append(call(f))
+ self.network.delete_ip.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_floating_ips_delete_with_exception(self):
+ arglist = [
+ self.floating_ips[0].id,
+ 'unexist_floating_ip',
+ ]
+ verifylist = [
+ ('floating_ip',
+ [self.floating_ips[0].id, 'unexist_floating_ip']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self.floating_ips[0], exceptions.CommandError]
+ self.network.find_ip = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 floating_ips failed to delete.', str(e))
+
+ self.network.find_ip.assert_any_call(
+ self.floating_ips[0].id, ignore_missing=False)
+ self.network.find_ip.assert_any_call(
+ 'unexist_floating_ip', ignore_missing=False)
+ self.network.delete_ip.assert_called_once_with(
+ self.floating_ips[0]
+ )
+
+
+class TestListFloatingIPNetwork(TestFloatingIPNetwork):
+
+ # The floating ips to list up
+ floating_ips = network_fakes.FakeFloatingIP.create_floating_ips(count=3)
+
+ columns = (
+ 'ID',
+ 'Floating IP Address',
+ 'Fixed IP Address',
+ 'Port',
+ )
+
+ data = []
+ for ip in floating_ips:
+ data.append((
+ ip.id,
+ ip.floating_ip_address,
+ ip.fixed_ip_address,
+ ip.port_id,
+ ))
+
+ def setUp(self):
+ super(TestListFloatingIPNetwork, self).setUp()
+
+ self.network.ips = mock.Mock(return_value=self.floating_ips)
+
+ # Get the command object to test
+ self.cmd = floating_ip.ListFloatingIP(self.app, self.namespace)
+
+ def test_floating_ip_list(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.ips.assert_called_once_with(**{})
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestShowFloatingIPNetwork(TestFloatingIPNetwork):
+
+ # The floating ip to display.
+ floating_ip = network_fakes.FakeFloatingIP.create_one_floating_ip()
+
+ columns = (
+ 'dns_domain',
+ 'dns_name',
+ 'fixed_ip_address',
+ 'floating_ip_address',
+ 'floating_network_id',
+ 'id',
+ 'port_id',
+ 'project_id',
+ 'router_id',
+ 'status',
+ )
+
+ data = (
+ floating_ip.dns_domain,
+ floating_ip.dns_name,
+ floating_ip.fixed_ip_address,
+ floating_ip.floating_ip_address,
+ floating_ip.floating_network_id,
+ floating_ip.id,
+ floating_ip.port_id,
+ floating_ip.tenant_id,
+ floating_ip.router_id,
+ floating_ip.status,
+ )
+
+ def setUp(self):
+ super(TestShowFloatingIPNetwork, self).setUp()
+
+ self.network.find_ip = mock.Mock(return_value=self.floating_ip)
+
+ # Get the command object to test
+ self.cmd = floating_ip.ShowFloatingIP(self.app, self.namespace)
+
+ def test_floating_ip_show(self):
+ arglist = [
+ self.floating_ip.id,
+ ]
+ verifylist = [
+ ('floating_ip', self.floating_ip.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_ip.assert_called_once_with(
+ self.floating_ip.id,
+ ignore_missing=False
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+# Tests for Nova network
+#
+class TestFloatingIPCompute(compute_fakes.TestComputev2):
+
+ def setUp(self):
+ super(TestFloatingIPCompute, self).setUp()
+
+ # Get a shortcut to the compute client
+ self.compute = self.app.client_manager.compute
+
+
+class TestCreateFloatingIPCompute(TestFloatingIPCompute):
+
+ # The floating ip to be deleted.
+ floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip()
+
+ columns = (
+ 'fixed_ip',
+ 'id',
+ 'instance_id',
+ 'ip',
+ 'pool',
+ )
+
+ data = (
+ floating_ip.fixed_ip,
+ floating_ip.id,
+ floating_ip.instance_id,
+ floating_ip.ip,
+ floating_ip.pool,
+ )
+
+ def setUp(self):
+ super(TestCreateFloatingIPCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.floating_ips.create.return_value = self.floating_ip
+
+ # Get the command object to test
+ self.cmd = floating_ip.CreateFloatingIP(self.app, None)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ self.floating_ip.pool,
+ ]
+ verifylist = [
+ ('network', self.floating_ip.pool),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.floating_ips.create.assert_called_once_with(
+ self.floating_ip.pool)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteFloatingIPCompute(TestFloatingIPCompute):
+
+ # The floating ips to be deleted.
+ floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=2)
+
+ def setUp(self):
+ super(TestDeleteFloatingIPCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.floating_ips.delete.return_value = None
+
+ # Return value of utils.find_resource()
+ self.compute.floating_ips.get = (
+ compute_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips))
+
+ # Get the command object to test
+ self.cmd = floating_ip.DeleteFloatingIP(self.app, None)
+
+ def test_floating_ip_delete(self):
+ arglist = [
+ self.floating_ips[0].id,
+ ]
+ verifylist = [
+ ('floating_ip', [self.floating_ips[0].id]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.compute.floating_ips.delete.assert_called_once_with(
+ self.floating_ips[0].id
+ )
+ self.assertIsNone(result)
+
+ def test_multi_floating_ips_delete(self):
+ arglist = []
+ verifylist = []
+
+ for f in self.floating_ips:
+ arglist.append(f.id)
+ verifylist = [
+ ('floating_ip', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for f in self.floating_ips:
+ calls.append(call(f.id))
+ self.compute.floating_ips.delete.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_floating_ips_delete_with_exception(self):
+ arglist = [
+ self.floating_ips[0].id,
+ 'unexist_floating_ip',
+ ]
+ verifylist = [
+ ('floating_ip',
+ [self.floating_ips[0].id, 'unexist_floating_ip']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self.floating_ips[0], exceptions.CommandError]
+ self.compute.floating_ips.get = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+ self.compute.floating_ips.find.side_effect = exceptions.NotFound(None)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 floating_ips failed to delete.', str(e))
+
+ self.compute.floating_ips.get.assert_any_call(
+ self.floating_ips[0].id)
+ self.compute.floating_ips.get.assert_any_call(
+ 'unexist_floating_ip')
+ self.compute.floating_ips.delete.assert_called_once_with(
+ self.floating_ips[0].id
+ )
+
+
+class TestListFloatingIPCompute(TestFloatingIPCompute):
+
+ # The floating ips to be list up
+ floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=3)
+
+ columns = (
+ 'ID',
+ 'Floating IP Address',
+ 'Fixed IP Address',
+ 'Server',
+ 'Pool',
+ )
+
+ data = []
+ for ip in floating_ips:
+ data.append((
+ ip.id,
+ ip.ip,
+ ip.fixed_ip,
+ ip.instance_id,
+ ip.pool,
+ ))
+
+ def setUp(self):
+ super(TestListFloatingIPCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.floating_ips.list.return_value = self.floating_ips
+
+ # Get the command object to test
+ self.cmd = floating_ip.ListFloatingIP(self.app, None)
+
+ def test_floating_ip_list(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.floating_ips.list.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestShowFloatingIPCompute(TestFloatingIPCompute):
+
+ # The floating ip to display.
+ floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip()
+
+ columns = (
+ 'fixed_ip',
+ 'id',
+ 'instance_id',
+ 'ip',
+ 'pool',
+ )
+
+ data = (
+ floating_ip.fixed_ip,
+ floating_ip.id,
+ floating_ip.instance_id,
+ floating_ip.ip,
+ floating_ip.pool,
+ )
+
+ def setUp(self):
+ super(TestShowFloatingIPCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ # Return value of utils.find_resource()
+ self.compute.floating_ips.get.return_value = self.floating_ip
+
+ # Get the command object to test
+ self.cmd = floating_ip.ShowFloatingIP(self.app, None)
+
+ def test_floating_ip_show(self):
+ arglist = [
+ self.floating_ip.id,
+ ]
+ verifylist = [
+ ('floating_ip', self.floating_ip.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_floating_ip_pool.py b/openstackclient/tests/unit/network/v2/test_floating_ip_pool.py
new file mode 100644
index 00000000..11d01d36
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_floating_ip_pool.py
@@ -0,0 +1,97 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+from osc_lib import exceptions
+
+from openstackclient.network.v2 import floating_ip_pool
+from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+
+
+# Tests for Network API v2
+#
+class TestFloatingIPPoolNetwork(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestFloatingIPPoolNetwork, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+
+
+class TestListFloatingIPPoolNetwork(TestFloatingIPPoolNetwork):
+
+ def setUp(self):
+ super(TestListFloatingIPPoolNetwork, self).setUp()
+
+ # Get the command object to test
+ self.cmd = floating_ip_pool.ListFloatingIPPool(self.app,
+ self.namespace)
+
+ def test_floating_ip_list(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+
+# Tests for Compute network
+#
+class TestFloatingIPPoolCompute(compute_fakes.TestComputev2):
+
+ def setUp(self):
+ super(TestFloatingIPPoolCompute, self).setUp()
+
+ # Get a shortcut to the compute client
+ self.compute = self.app.client_manager.compute
+
+
+class TestListFloatingIPPoolCompute(TestFloatingIPPoolCompute):
+
+ # The floating ip pools to list up
+ floating_ip_pools = \
+ compute_fakes.FakeFloatingIPPool.create_floating_ip_pools(count=3)
+
+ columns = (
+ 'Name',
+ )
+
+ data = []
+ for pool in floating_ip_pools:
+ data.append((
+ pool.name,
+ ))
+
+ def setUp(self):
+ super(TestListFloatingIPPoolCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.floating_ip_pools.list.return_value = \
+ self.floating_ip_pools
+
+ # Get the command object to test
+ self.cmd = floating_ip_pool.ListFloatingIPPool(self.app, None)
+
+ def test_floating_ip_list(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.floating_ip_pools.list.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
diff --git a/openstackclient/tests/unit/network/v2/test_ip_availability.py b/openstackclient/tests/unit/network/v2/test_ip_availability.py
new file mode 100644
index 00000000..c929ab82
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_ip_availability.py
@@ -0,0 +1,172 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+
+from osc_lib import utils as common_utils
+
+from openstackclient.network.v2 import ip_availability
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestIPAvailability(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestIPAvailability, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+
+ # Get a shortcut to the ProjectManager Mock
+ self.projects_mock = self.app.client_manager.identity.projects
+
+ self.project = identity_fakes.FakeProject.create_one_project()
+ self.projects_mock.get.return_value = self.project
+
+
+class TestListIPAvailability(TestIPAvailability):
+
+ _ip_availability = \
+ network_fakes.FakeIPAvailability.create_ip_availability(count=3)
+ columns = (
+ 'Network ID',
+ 'Network Name',
+ 'Total IPs',
+ 'Used IPs',
+ )
+ data = []
+ for net in _ip_availability:
+ data.append((
+ net.network_id,
+ net.network_name,
+ net.total_ips,
+ net.used_ips,
+ ))
+
+ def setUp(self):
+ super(TestListIPAvailability, self).setUp()
+
+ self.cmd = ip_availability.ListIPAvailability(
+ self.app, self.namespace)
+ self.network.network_ip_availabilities = mock.Mock(
+ return_value=self._ip_availability)
+
+ def test_list_no_options(self):
+ arglist = []
+ verifylist = []
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ filters = {'ip_version': 4}
+
+ self.network.network_ip_availabilities.assert_called_once_with(
+ **filters)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_list_ip_version(self):
+ arglist = [
+ '--ip-version', str(4),
+ ]
+ verifylist = [
+ ('ip_version', 4)
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ filters = {'ip_version': 4}
+
+ self.network.network_ip_availabilities.assert_called_once_with(
+ **filters)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_list_project(self):
+ arglist = [
+ '--project', self.project.name
+ ]
+ verifylist = [
+ ('project', self.project.name)
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ filters = {'tenant_id': self.project.id,
+ 'ip_version': 4}
+
+ self.network.network_ip_availabilities.assert_called_once_with(
+ **filters)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestShowIPAvailability(TestIPAvailability):
+
+ _ip_availability = \
+ network_fakes.FakeIPAvailability.create_one_ip_availability()
+
+ columns = (
+ 'network_id',
+ 'network_name',
+ 'project_id',
+ 'subnet_ip_availability',
+ 'total_ips',
+ 'used_ips',
+ )
+ data = (
+ _ip_availability.network_id,
+ _ip_availability.network_name,
+ _ip_availability.tenant_id,
+ common_utils.format_list(
+ _ip_availability.subnet_ip_availability),
+ _ip_availability.total_ips,
+ _ip_availability.used_ips,
+ )
+
+ def setUp(self):
+ super(TestShowIPAvailability, self).setUp()
+
+ self.network.find_network_ip_availability = mock.Mock(
+ return_value=self._ip_availability)
+
+ # Get the command object to test
+ self.cmd = ip_availability.ShowIPAvailability(
+ self.app, self.namespace)
+
+ def test_show_no_option(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self._ip_availability.network_name,
+ ]
+ verifylist = [
+ ('network', self._ip_availability.network_name)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+ self.network.find_network_ip_availability.assert_called_once_with(
+ self._ip_availability.network_name,
+ ignore_missing=False)
+
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_network.py b/openstackclient/tests/unit/network/v2/test_network.py
new file mode 100644
index 00000000..84ead093
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_network.py
@@ -0,0 +1,1059 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+from mock import call
+
+from osc_lib import exceptions
+from osc_lib import utils
+
+from openstackclient.network.v2 import network
+from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
+from openstackclient.tests.unit import fakes
+from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes_v2
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+# Tests for Neutron network
+#
+class TestNetwork(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestNetwork, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+ # Get a shortcut to the ProjectManager Mock
+ self.projects_mock = self.app.client_manager.identity.projects
+ # Get a shortcut to the DomainManager Mock
+ self.domains_mock = self.app.client_manager.identity.domains
+
+
+class TestCreateNetworkIdentityV3(TestNetwork):
+
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ domain = identity_fakes_v3.FakeDomain.create_one_domain()
+ # The new network created.
+ _network = network_fakes.FakeNetwork.create_one_network(
+ attrs={
+ 'tenant_id': project.id,
+ 'availability_zone_hints': ["nova"],
+ }
+ )
+
+ columns = (
+ 'admin_state_up',
+ 'availability_zone_hints',
+ 'availability_zones',
+ 'id',
+ 'is_default',
+ 'name',
+ 'port_security_enabled',
+ 'project_id',
+ 'provider_network_type',
+ 'router:external',
+ 'shared',
+ 'status',
+ 'subnets',
+ )
+
+ data = (
+ network._format_admin_state(_network.admin_state_up),
+ utils.format_list(_network.availability_zone_hints),
+ utils.format_list(_network.availability_zones),
+ _network.id,
+ _network.is_default,
+ _network.name,
+ _network.is_port_security_enabled,
+ _network.project_id,
+ _network.provider_network_type,
+ network._format_router_external(_network.is_router_external),
+ _network.shared,
+ _network.status,
+ utils.format_list(_network.subnets),
+ )
+
+ def setUp(self):
+ super(TestCreateNetworkIdentityV3, self).setUp()
+
+ self.network.create_network = mock.Mock(return_value=self._network)
+
+ # Get the command object to test
+ self.cmd = network.CreateNetwork(self.app, self.namespace)
+
+ self.projects_mock.get.return_value = self.project
+ self.domains_mock.get.return_value = self.domain
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ self._network.name,
+ ]
+ verifylist = [
+ ('name', self._network.name),
+ ('enable', True),
+ ('share', None),
+ ('project', None),
+ ('external', False),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_network.assert_called_once_with(**{
+ 'admin_state_up': True,
+ 'name': self._network.name,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_all_options(self):
+ arglist = [
+ "--disable",
+ "--share",
+ "--project", self.project.name,
+ "--project-domain", self.domain.name,
+ "--availability-zone-hint", "nova",
+ "--external", "--default",
+ "--provider-network-type", "vlan",
+ "--provider-physical-network", "physnet1",
+ "--provider-segment", "400",
+ "--transparent-vlan",
+ "--enable-port-security",
+ self._network.name,
+ ]
+ verifylist = [
+ ('disable', True),
+ ('share', True),
+ ('project', self.project.name),
+ ('project_domain', self.domain.name),
+ ('availability_zone_hints', ["nova"]),
+ ('external', True),
+ ('default', True),
+ ('provider_network_type', 'vlan'),
+ ('physical_network', 'physnet1'),
+ ('segmentation_id', '400'),
+ ('transparent_vlan', True),
+ ('enable_port_security', True),
+ ('name', self._network.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_network.assert_called_once_with(**{
+ 'admin_state_up': False,
+ 'availability_zone_hints': ["nova"],
+ 'name': self._network.name,
+ 'shared': True,
+ 'tenant_id': self.project.id,
+ 'is_default': True,
+ 'router:external': True,
+ 'provider:network_type': 'vlan',
+ 'provider:physical_network': 'physnet1',
+ 'provider:segmentation_id': '400',
+ 'vlan_transparent': True,
+ 'port_security_enabled': True,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_other_options(self):
+ arglist = [
+ "--enable",
+ "--no-share",
+ "--disable-port-security",
+ self._network.name,
+ ]
+ verifylist = [
+ ('enable', True),
+ ('no_share', True),
+ ('name', self._network.name),
+ ('external', False),
+ ('disable_port_security', True),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_network.assert_called_once_with(**{
+ 'admin_state_up': True,
+ 'name': self._network.name,
+ 'shared': False,
+ 'port_security_enabled': False,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestCreateNetworkIdentityV2(TestNetwork):
+
+ project = identity_fakes_v2.FakeProject.create_one_project()
+ # The new network created.
+ _network = network_fakes.FakeNetwork.create_one_network(
+ attrs={'tenant_id': project.id}
+ )
+
+ columns = (
+ 'admin_state_up',
+ 'availability_zone_hints',
+ 'availability_zones',
+ 'id',
+ 'is_default',
+ 'name',
+ 'port_security_enabled',
+ 'project_id',
+ 'provider_network_type',
+ 'router:external',
+ 'shared',
+ 'status',
+ 'subnets',
+ )
+
+ data = (
+ network._format_admin_state(_network.admin_state_up),
+ utils.format_list(_network.availability_zone_hints),
+ utils.format_list(_network.availability_zones),
+ _network.id,
+ _network.is_default,
+ _network.name,
+ _network.is_port_security_enabled,
+ _network.project_id,
+ _network.provider_network_type,
+ network._format_router_external(_network.is_router_external),
+ _network.shared,
+ _network.status,
+ utils.format_list(_network.subnets),
+ )
+
+ def setUp(self):
+ super(TestCreateNetworkIdentityV2, self).setUp()
+
+ self.network.create_network = mock.Mock(return_value=self._network)
+
+ # Get the command object to test
+ self.cmd = network.CreateNetwork(self.app, self.namespace)
+
+ # Set identity client v2. And get a shortcut to Identity client.
+ identity_client = identity_fakes_v2.FakeIdentityv2Client(
+ endpoint=fakes.AUTH_URL,
+ token=fakes.AUTH_TOKEN,
+ )
+ self.app.client_manager.identity = identity_client
+ self.identity = self.app.client_manager.identity
+
+ # Get a shortcut to the ProjectManager Mock
+ self.projects_mock = self.identity.tenants
+ self.projects_mock.get.return_value = self.project
+
+ # There is no DomainManager Mock in fake identity v2.
+
+ def test_create_with_project_identityv2(self):
+ arglist = [
+ "--project", self.project.name,
+ self._network.name,
+ ]
+ verifylist = [
+ ('enable', True),
+ ('share', None),
+ ('name', self._network.name),
+ ('project', self.project.name),
+ ('external', False),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_network.assert_called_once_with(**{
+ 'admin_state_up': True,
+ 'name': self._network.name,
+ 'tenant_id': self.project.id,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_with_domain_identityv2(self):
+ arglist = [
+ "--project", self.project.name,
+ "--project-domain", "domain-name",
+ self._network.name,
+ ]
+ verifylist = [
+ ('enable', True),
+ ('share', None),
+ ('project', self.project.name),
+ ('project_domain', "domain-name"),
+ ('name', self._network.name),
+ ('external', False),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.assertRaises(
+ AttributeError,
+ self.cmd.take_action,
+ parsed_args,
+ )
+
+
+class TestDeleteNetwork(TestNetwork):
+
+ def setUp(self):
+ super(TestDeleteNetwork, self).setUp()
+
+ # The networks to delete
+ self._networks = network_fakes.FakeNetwork.create_networks(count=3)
+
+ self.network.delete_network = mock.Mock(return_value=None)
+
+ self.network.find_network = network_fakes.FakeNetwork.get_networks(
+ networks=self._networks)
+
+ # Get the command object to test
+ self.cmd = network.DeleteNetwork(self.app, self.namespace)
+
+ def test_delete_one_network(self):
+ arglist = [
+ self._networks[0].name,
+ ]
+ verifylist = [
+ ('network', [self._networks[0].name]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.delete_network.assert_called_once_with(self._networks[0])
+ self.assertIsNone(result)
+
+ def test_delete_multiple_networks(self):
+ arglist = []
+ for n in self._networks:
+ arglist.append(n.id)
+ verifylist = [
+ ('network', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for n in self._networks:
+ calls.append(call(n))
+ self.network.delete_network.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_delete_multiple_networks_exception(self):
+ arglist = [
+ self._networks[0].id,
+ 'xxxx-yyyy-zzzz',
+ self._networks[1].id,
+ ]
+ verifylist = [
+ ('network', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # Fake exception in find_network()
+ ret_find = [
+ self._networks[0],
+ exceptions.NotFound('404'),
+ self._networks[1],
+ ]
+ self.network.find_network = mock.Mock(side_effect=ret_find)
+
+ # Fake exception in delete_network()
+ ret_delete = [
+ None,
+ exceptions.NotFound('404'),
+ ]
+ self.network.delete_network = mock.Mock(side_effect=ret_delete)
+
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ # The second call of find_network() should fail. So delete_network()
+ # was only called twice.
+ calls = [
+ call(self._networks[0]),
+ call(self._networks[1]),
+ ]
+ self.network.delete_network.assert_has_calls(calls)
+
+
+class TestListNetwork(TestNetwork):
+
+ # The networks going to be listed up.
+ _network = network_fakes.FakeNetwork.create_networks(count=3)
+
+ columns = (
+ 'ID',
+ 'Name',
+ 'Subnets',
+ )
+ columns_long = (
+ 'ID',
+ 'Name',
+ 'Status',
+ 'Project',
+ 'State',
+ 'Shared',
+ 'Subnets',
+ 'Network Type',
+ 'Router Type',
+ 'Availability Zones',
+ )
+
+ data = []
+ for net in _network:
+ data.append((
+ net.id,
+ net.name,
+ utils.format_list(net.subnets),
+ ))
+
+ data_long = []
+ for net in _network:
+ data_long.append((
+ net.id,
+ net.name,
+ net.status,
+ net.project_id,
+ network._format_admin_state(net.admin_state_up),
+ net.shared,
+ utils.format_list(net.subnets),
+ net.provider_network_type,
+ network._format_router_external(net.is_router_external),
+ utils.format_list(net.availability_zones),
+ ))
+
+ def setUp(self):
+ super(TestListNetwork, self).setUp()
+
+ # Get the command object to test
+ self.cmd = network.ListNetwork(self.app, self.namespace)
+
+ self.network.networks = mock.Mock(return_value=self._network)
+
+ def test_network_list_no_options(self):
+ arglist = []
+ verifylist = [
+ ('external', False),
+ ('long', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # In base command class Lister in cliff, abstract method take_action()
+ # returns a tuple containing the column names and an iterable
+ # containing the data to be listed.
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.networks.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_list_external(self):
+ arglist = [
+ '--external',
+ ]
+ verifylist = [
+ ('external', True),
+ ('long', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # In base command class Lister in cliff, abstract method take_action()
+ # returns a tuple containing the column names and an iterable
+ # containing the data to be listed.
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.networks.assert_called_once_with(
+ **{'router:external': True}
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_network_list_long(self):
+ arglist = [
+ '--long',
+ ]
+ verifylist = [
+ ('long', True),
+ ('external', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # In base command class Lister in cliff, abstract method take_action()
+ # returns a tuple containing the column names and an iterable
+ # containing the data to be listed.
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.networks.assert_called_once_with()
+ self.assertEqual(self.columns_long, columns)
+ self.assertEqual(self.data_long, list(data))
+
+
+class TestSetNetwork(TestNetwork):
+
+ # The network to set.
+ _network = network_fakes.FakeNetwork.create_one_network()
+
+ def setUp(self):
+ super(TestSetNetwork, self).setUp()
+
+ self.network.update_network = mock.Mock(return_value=None)
+
+ self.network.find_network = mock.Mock(return_value=self._network)
+
+ # Get the command object to test
+ self.cmd = network.SetNetwork(self.app, self.namespace)
+
+ def test_set_this(self):
+ arglist = [
+ self._network.name,
+ '--enable',
+ '--name', 'noob',
+ '--share',
+ '--external',
+ '--default',
+ '--provider-network-type', 'vlan',
+ '--provider-physical-network', 'physnet1',
+ '--provider-segment', '400',
+ '--no-transparent-vlan',
+ '--enable-port-security',
+ ]
+ verifylist = [
+ ('network', self._network.name),
+ ('enable', True),
+ ('name', 'noob'),
+ ('share', True),
+ ('external', True),
+ ('default', True),
+ ('provider_network_type', 'vlan'),
+ ('physical_network', 'physnet1'),
+ ('segmentation_id', '400'),
+ ('no_transparent_vlan', True),
+ ('enable_port_security', True),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'name': 'noob',
+ 'admin_state_up': True,
+ 'shared': True,
+ 'router:external': True,
+ 'is_default': True,
+ 'provider:network_type': 'vlan',
+ 'provider:physical_network': 'physnet1',
+ 'provider:segmentation_id': '400',
+ 'vlan_transparent': False,
+ 'port_security_enabled': True,
+ }
+ self.network.update_network.assert_called_once_with(
+ self._network, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_that(self):
+ arglist = [
+ self._network.name,
+ '--disable',
+ '--no-share',
+ '--internal',
+ '--disable-port-security',
+ ]
+ verifylist = [
+ ('network', self._network.name),
+ ('disable', True),
+ ('no_share', True),
+ ('internal', True),
+ ('disable_port_security', True),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'admin_state_up': False,
+ 'shared': False,
+ 'router:external': False,
+ 'port_security_enabled': False,
+ }
+ self.network.update_network.assert_called_once_with(
+ self._network, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_nothing(self):
+ arglist = [self._network.name, ]
+ verifylist = [('network', self._network.name), ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {}
+ self.network.update_network.assert_called_once_with(
+ self._network, **attrs)
+ self.assertIsNone(result)
+
+
+class TestShowNetwork(TestNetwork):
+
+ # The network to show.
+ _network = network_fakes.FakeNetwork.create_one_network()
+
+ columns = (
+ 'admin_state_up',
+ 'availability_zone_hints',
+ 'availability_zones',
+ 'id',
+ 'is_default',
+ 'name',
+ 'port_security_enabled',
+ 'project_id',
+ 'provider_network_type',
+ 'router:external',
+ 'shared',
+ 'status',
+ 'subnets',
+ )
+
+ data = (
+ network._format_admin_state(_network.admin_state_up),
+ utils.format_list(_network.availability_zone_hints),
+ utils.format_list(_network.availability_zones),
+ _network.id,
+ _network.is_default,
+ _network.name,
+ _network.is_port_security_enabled,
+ _network.project_id,
+ _network.provider_network_type,
+ network._format_router_external(_network.is_router_external),
+ _network.shared,
+ _network.status,
+ utils.format_list(_network.subnets),
+ )
+
+ def setUp(self):
+ super(TestShowNetwork, self).setUp()
+
+ self.network.find_network = mock.Mock(return_value=self._network)
+
+ # Get the command object to test
+ self.cmd = network.ShowNetwork(self.app, self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self._network.name,
+ ]
+ verifylist = [
+ ('network', self._network.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_network.assert_called_once_with(
+ self._network.name, ignore_missing=False)
+
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+# Tests for Nova network
+#
+class TestNetworkCompute(compute_fakes.TestComputev2):
+
+ def setUp(self):
+ super(TestNetworkCompute, self).setUp()
+
+ # Get a shortcut to the compute client
+ self.compute = self.app.client_manager.compute
+
+
+class TestCreateNetworkCompute(TestNetworkCompute):
+
+ # The network to create.
+ _network = compute_fakes.FakeNetwork.create_one_network()
+
+ columns = (
+ 'bridge',
+ 'bridge_interface',
+ 'broadcast',
+ 'cidr',
+ 'cidr_v6',
+ 'created_at',
+ 'deleted',
+ 'deleted_at',
+ 'dhcp_server',
+ 'dhcp_start',
+ 'dns1',
+ 'dns2',
+ 'enable_dhcp',
+ 'gateway',
+ 'gateway_v6',
+ 'host',
+ 'id',
+ 'injected',
+ 'label',
+ 'mtu',
+ 'multi_host',
+ 'netmask',
+ 'netmask_v6',
+ 'priority',
+ 'project_id',
+ 'rxtx_base',
+ 'share_address',
+ 'updated_at',
+ 'vlan',
+ 'vpn_private_address',
+ 'vpn_public_address',
+ 'vpn_public_port',
+ )
+
+ data = (
+ _network.bridge,
+ _network.bridge_interface,
+ _network.broadcast,
+ _network.cidr,
+ _network.cidr_v6,
+ _network.created_at,
+ _network.deleted,
+ _network.deleted_at,
+ _network.dhcp_server,
+ _network.dhcp_start,
+ _network.dns1,
+ _network.dns2,
+ _network.enable_dhcp,
+ _network.gateway,
+ _network.gateway_v6,
+ _network.host,
+ _network.id,
+ _network.injected,
+ _network.label,
+ _network.mtu,
+ _network.multi_host,
+ _network.netmask,
+ _network.netmask_v6,
+ _network.priority,
+ _network.project_id,
+ _network.rxtx_base,
+ _network.share_address,
+ _network.updated_at,
+ _network.vlan,
+ _network.vpn_private_address,
+ _network.vpn_public_address,
+ _network.vpn_public_port,
+ )
+
+ def setUp(self):
+ super(TestCreateNetworkCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.networks.create.return_value = self._network
+
+ # Get the command object to test
+ self.cmd = network.CreateNetwork(self.app, None)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should raise exception here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ "--subnet", self._network.cidr,
+ self._network.label,
+ ]
+ verifylist = [
+ ('subnet', self._network.cidr),
+ ('name', self._network.label),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.networks.create.assert_called_once_with(**{
+ 'cidr': self._network.cidr,
+ 'label': self._network.label,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteNetworkCompute(TestNetworkCompute):
+
+ def setUp(self):
+ super(TestDeleteNetworkCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ # The networks to delete
+ self._networks = compute_fakes.FakeNetwork.create_networks(count=3)
+
+ self.compute.networks.delete.return_value = None
+
+ # Return value of utils.find_resource()
+ self.compute.networks.get = \
+ compute_fakes.FakeNetwork.get_networks(networks=self._networks)
+
+ # Get the command object to test
+ self.cmd = network.DeleteNetwork(self.app, None)
+
+ def test_delete_one_network(self):
+ arglist = [
+ self._networks[0].label,
+ ]
+ verifylist = [
+ ('network', [self._networks[0].label]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.compute.networks.delete.assert_called_once_with(
+ self._networks[0].id)
+ self.assertIsNone(result)
+
+ def test_delete_multiple_networks(self):
+ arglist = []
+ for n in self._networks:
+ arglist.append(n.label)
+ verifylist = [
+ ('network', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for n in self._networks:
+ calls.append(call(n.id))
+ self.compute.networks.delete.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_delete_multiple_networks_exception(self):
+ arglist = [
+ self._networks[0].id,
+ 'xxxx-yyyy-zzzz',
+ self._networks[1].id,
+ ]
+ verifylist = [
+ ('network', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # Fake exception in utils.find_resource()
+ # In compute v2, we use utils.find_resource() to find a network.
+ # It calls get() several times, but find() only one time. So we
+ # choose to fake get() always raise exception, then pass through.
+ # And fake find() to find the real network or not.
+ self.compute.networks.get.side_effect = Exception()
+ ret_find = [
+ self._networks[0],
+ Exception(),
+ self._networks[1],
+ ]
+ self.compute.networks.find.side_effect = ret_find
+
+ # Fake exception in delete()
+ ret_delete = [
+ None,
+ Exception(),
+ ]
+ self.compute.networks.delete = mock.Mock(side_effect=ret_delete)
+
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ # The second call of utils.find_resource() should fail. So delete()
+ # was only called twice.
+ calls = [
+ call(self._networks[0].id),
+ call(self._networks[1].id),
+ ]
+ self.compute.networks.delete.assert_has_calls(calls)
+
+
+class TestListNetworkCompute(TestNetworkCompute):
+
+ # The networks going to be listed up.
+ _networks = compute_fakes.FakeNetwork.create_networks(count=3)
+
+ columns = (
+ 'ID',
+ 'Name',
+ 'Subnet',
+ )
+
+ data = []
+ for net in _networks:
+ data.append((
+ net.id,
+ net.label,
+ net.cidr,
+ ))
+
+ def setUp(self):
+ super(TestListNetworkCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.networks.list.return_value = self._networks
+
+ # Get the command object to test
+ self.cmd = network.ListNetwork(self.app, None)
+
+ def test_network_list_no_options(self):
+ arglist = []
+ verifylist = [
+ ('external', False),
+ ('long', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # In base command class Lister in cliff, abstract method take_action()
+ # returns a tuple containing the column names and an iterable
+ # containing the data to be listed.
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.networks.list.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestShowNetworkCompute(TestNetworkCompute):
+
+ # The network to show.
+ _network = compute_fakes.FakeNetwork.create_one_network()
+
+ columns = (
+ 'bridge',
+ 'bridge_interface',
+ 'broadcast',
+ 'cidr',
+ 'cidr_v6',
+ 'created_at',
+ 'deleted',
+ 'deleted_at',
+ 'dhcp_server',
+ 'dhcp_start',
+ 'dns1',
+ 'dns2',
+ 'enable_dhcp',
+ 'gateway',
+ 'gateway_v6',
+ 'host',
+ 'id',
+ 'injected',
+ 'label',
+ 'mtu',
+ 'multi_host',
+ 'netmask',
+ 'netmask_v6',
+ 'priority',
+ 'project_id',
+ 'rxtx_base',
+ 'share_address',
+ 'updated_at',
+ 'vlan',
+ 'vpn_private_address',
+ 'vpn_public_address',
+ 'vpn_public_port',
+ )
+
+ data = (
+ _network.bridge,
+ _network.bridge_interface,
+ _network.broadcast,
+ _network.cidr,
+ _network.cidr_v6,
+ _network.created_at,
+ _network.deleted,
+ _network.deleted_at,
+ _network.dhcp_server,
+ _network.dhcp_start,
+ _network.dns1,
+ _network.dns2,
+ _network.enable_dhcp,
+ _network.gateway,
+ _network.gateway_v6,
+ _network.host,
+ _network.id,
+ _network.injected,
+ _network.label,
+ _network.mtu,
+ _network.multi_host,
+ _network.netmask,
+ _network.netmask_v6,
+ _network.priority,
+ _network.project_id,
+ _network.rxtx_base,
+ _network.share_address,
+ _network.updated_at,
+ _network.vlan,
+ _network.vpn_private_address,
+ _network.vpn_public_address,
+ _network.vpn_public_port,
+ )
+
+ def setUp(self):
+ super(TestShowNetworkCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ # Return value of utils.find_resource()
+ self.compute.networks.get.return_value = self._network
+
+ # Get the command object to test
+ self.cmd = network.ShowNetwork(self.app, None)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self._network.label,
+ ]
+ verifylist = [
+ ('network', self._network.label),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_network_agent.py b/openstackclient/tests/unit/network/v2/test_network_agent.py
new file mode 100644
index 00000000..2f17f41b
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_network_agent.py
@@ -0,0 +1,294 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+from mock import call
+
+from osc_lib import exceptions
+from osc_lib import utils
+
+from openstackclient.network.v2 import network_agent
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestNetworkAgent(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestNetworkAgent, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+
+
+class TestDeleteNetworkAgent(TestNetworkAgent):
+
+ network_agents = (
+ network_fakes.FakeNetworkAgent.create_network_agents(count=2))
+
+ def setUp(self):
+ super(TestDeleteNetworkAgent, self).setUp()
+ self.network.delete_agent = mock.Mock(return_value=None)
+ self.network.get_agent = (
+ network_fakes.FakeNetworkAgent.get_network_agents(
+ agents=self.network_agents)
+ )
+
+ # Get the command object to test
+ self.cmd = network_agent.DeleteNetworkAgent(self.app, self.namespace)
+
+ def test_network_agent_delete(self):
+ arglist = [
+ self.network_agents[0].id,
+ ]
+ verifylist = [
+ ('network_agent', [self.network_agents[0].id]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.get_agent.assert_called_once_with(
+ self.network_agents[0].id, ignore_missing=False)
+ self.network.delete_agent.assert_called_once_with(
+ self.network_agents[0])
+ self.assertIsNone(result)
+
+ def test_multi_network_agents_delete(self):
+ arglist = []
+ verifylist = []
+
+ for n in self.network_agents:
+ arglist.append(n.id)
+ verifylist = [
+ ('network_agent', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for n in self.network_agents:
+ calls.append(call(n))
+ self.network.delete_agent.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_network_agents_delete_with_exception(self):
+ arglist = [
+ self.network_agents[0].id,
+ 'unexist_network_agent',
+ ]
+ verifylist = [
+ ('network_agent',
+ [self.network_agents[0].id, 'unexist_network_agent']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self.network_agents[0], exceptions.CommandError]
+ self.network.get_agent = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 network agents failed to delete.', str(e))
+
+ self.network.get_agent.assert_any_call(
+ self.network_agents[0].id, ignore_missing=False)
+ self.network.get_agent.assert_any_call(
+ 'unexist_network_agent', ignore_missing=False)
+ self.network.delete_agent.assert_called_once_with(
+ self.network_agents[0]
+ )
+
+
+class TestListNetworkAgent(TestNetworkAgent):
+
+ network_agents = (
+ network_fakes.FakeNetworkAgent.create_network_agents(count=3))
+
+ columns = (
+ 'ID',
+ 'Agent Type',
+ 'Host',
+ 'Availability Zone',
+ 'Alive',
+ 'State',
+ 'Binary'
+ )
+ data = []
+ for agent in network_agents:
+ data.append((
+ agent.id,
+ agent.agent_type,
+ agent.host,
+ agent.availability_zone,
+ agent.alive,
+ network_agent._format_admin_state(agent.admin_state_up),
+ agent.binary,
+ ))
+
+ def setUp(self):
+ super(TestListNetworkAgent, self).setUp()
+ self.network.agents = mock.Mock(
+ return_value=self.network_agents)
+
+ # Get the command object to test
+ self.cmd = network_agent.ListNetworkAgent(self.app, self.namespace)
+
+ def test_network_agents_list(self):
+ arglist = []
+ verifylist = []
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.agents.assert_called_once_with(**{})
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestSetNetworkAgent(TestNetworkAgent):
+
+ _network_agent = (
+ network_fakes.FakeNetworkAgent.create_one_network_agent())
+
+ def setUp(self):
+ super(TestSetNetworkAgent, self).setUp()
+ self.network.update_agent = mock.Mock(return_value=None)
+ self.network.get_agent = mock.Mock(return_value=self._network_agent)
+
+ # Get the command object to test
+ self.cmd = network_agent.SetNetworkAgent(self.app, self.namespace)
+
+ def test_set_nothing(self):
+ arglist = [
+ self._network_agent.id,
+ ]
+ verifylist = [
+ ('network_agent', self._network_agent.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {}
+ self.network.update_agent.assert_called_once_with(
+ self._network_agent, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_all(self):
+ arglist = [
+ '--description', 'new_description',
+ '--enable',
+ self._network_agent.id,
+ ]
+ verifylist = [
+ ('description', 'new_description'),
+ ('enable', True),
+ ('disable', False),
+ ('network_agent', self._network_agent.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'description': 'new_description',
+ 'admin_state_up': True,
+ }
+ self.network.update_agent.assert_called_once_with(
+ self._network_agent, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_with_disable(self):
+ arglist = [
+ '--disable',
+ self._network_agent.id,
+ ]
+ verifylist = [
+ ('enable', False),
+ ('disable', True),
+ ('network_agent', self._network_agent.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'admin_state_up': False,
+ }
+ self.network.update_agent.assert_called_once_with(
+ self._network_agent, **attrs)
+ self.assertIsNone(result)
+
+
+class TestShowNetworkAgent(TestNetworkAgent):
+
+ _network_agent = (
+ network_fakes.FakeNetworkAgent.create_one_network_agent())
+
+ columns = (
+ 'admin_state_up',
+ 'agent_type',
+ 'alive',
+ 'availability_zone',
+ 'binary',
+ 'configurations',
+ 'host',
+ 'id',
+ )
+ data = (
+ network_agent._format_admin_state(_network_agent.admin_state_up),
+ _network_agent.agent_type,
+ _network_agent.alive,
+ _network_agent.availability_zone,
+ _network_agent.binary,
+ utils.format_dict(_network_agent.configurations),
+ _network_agent.host,
+ _network_agent.id,
+ )
+
+ def setUp(self):
+ super(TestShowNetworkAgent, self).setUp()
+ self.network.get_agent = mock.Mock(
+ return_value=self._network_agent)
+
+ # Get the command object to test
+ self.cmd = network_agent.ShowNetworkAgent(self.app, self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self._network_agent.id,
+ ]
+ verifylist = [
+ ('network_agent', self._network_agent.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.get_agent.assert_called_once_with(
+ self._network_agent.id, ignore_missing=False)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(list(self.data), list(data))
diff --git a/openstackclient/tests/unit/network/v2/test_network_rbac.py b/openstackclient/tests/unit/network/v2/test_network_rbac.py
new file mode 100644
index 00000000..1cd18a09
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_network_rbac.py
@@ -0,0 +1,430 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+from mock import call
+
+from osc_lib import exceptions
+
+from openstackclient.network.v2 import network_rbac
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestNetworkRBAC(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestNetworkRBAC, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+ # Get a shortcut to the ProjectManager Mock
+ self.projects_mock = self.app.client_manager.identity.projects
+
+
+class TestCreateNetworkRBAC(TestNetworkRBAC):
+
+ network_object = network_fakes.FakeNetwork.create_one_network()
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac(
+ attrs={'tenant_id': project.id,
+ 'target_tenant': project.id,
+ 'object_id': network_object.id}
+ )
+
+ columns = (
+ 'action',
+ 'id',
+ 'object_id',
+ 'object_type',
+ 'project_id',
+ 'target_project_id',
+ )
+
+ data = [
+ rbac_policy.action,
+ rbac_policy.id,
+ rbac_policy.object_id,
+ rbac_policy.object_type,
+ rbac_policy.tenant_id,
+ rbac_policy.target_tenant,
+ ]
+
+ def setUp(self):
+ super(TestCreateNetworkRBAC, self).setUp()
+
+ # Get the command object to test
+ self.cmd = network_rbac.CreateNetworkRBAC(self.app, self.namespace)
+
+ self.network.create_rbac_policy = mock.Mock(
+ return_value=self.rbac_policy)
+ self.network.find_network = mock.Mock(
+ return_value=self.network_object)
+ self.projects_mock.get.return_value = self.project
+
+ def test_network_rbac_create_no_type(self):
+ arglist = [
+ '--action', self.rbac_policy.action,
+ self.rbac_policy.object_id,
+ ]
+ verifylist = [
+ ('action', self.rbac_policy.action),
+ ('rbac_policy', self.rbac_policy.id),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_network_rbac_create_no_action(self):
+ arglist = [
+ '--type', self.rbac_policy.object_type,
+ self.rbac_policy.object_id,
+ ]
+ verifylist = [
+ ('type', self.rbac_policy.object_type),
+ ('rbac_policy', self.rbac_policy.id),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_network_rbac_create_invalid_type(self):
+ arglist = [
+ '--action', self.rbac_policy.action,
+ '--type', 'invalid_type',
+ '--target-project', self.rbac_policy.target_tenant,
+ self.rbac_policy.object_id,
+ ]
+ verifylist = [
+ ('action', self.rbac_policy.action),
+ ('type', 'invalid_type'),
+ ('target-project', self.rbac_policy.target_tenant),
+ ('rbac_policy', self.rbac_policy.id),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_network_rbac_create_invalid_action(self):
+ arglist = [
+ '--type', self.rbac_policy.object_type,
+ '--action', 'invalid_action',
+ '--target-project', self.rbac_policy.target_tenant,
+ self.rbac_policy.object_id,
+ ]
+ verifylist = [
+ ('type', self.rbac_policy.object_type),
+ ('action', 'invalid_action'),
+ ('target-project', self.rbac_policy.target_tenant),
+ ('rbac_policy', self.rbac_policy.id),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_network_rbac_create(self):
+ arglist = [
+ '--type', self.rbac_policy.object_type,
+ '--action', self.rbac_policy.action,
+ '--target-project', self.rbac_policy.target_tenant,
+ self.rbac_policy.object_id,
+ ]
+ verifylist = [
+ ('type', self.rbac_policy.object_type),
+ ('action', self.rbac_policy.action),
+ ('target_project', self.rbac_policy.target_tenant),
+ ('rbac_object', self.rbac_policy.object_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_rbac_policy.assert_called_with(**{
+ 'object_id': self.rbac_policy.object_id,
+ 'object_type': self.rbac_policy.object_type,
+ 'action': self.rbac_policy.action,
+ 'target_tenant': self.rbac_policy.target_tenant,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_network_rbac_create_all_options(self):
+ arglist = [
+ '--type', self.rbac_policy.object_type,
+ '--action', self.rbac_policy.action,
+ '--target-project', self.rbac_policy.target_tenant,
+ '--project', self.rbac_policy.tenant_id,
+ '--project-domain', self.project.domain_id,
+ '--target-project-domain', self.project.domain_id,
+ self.rbac_policy.object_id,
+ ]
+ verifylist = [
+ ('type', self.rbac_policy.object_type),
+ ('action', self.rbac_policy.action),
+ ('target_project', self.rbac_policy.target_tenant),
+ ('project', self.rbac_policy.tenant_id),
+ ('project_domain', self.project.domain_id),
+ ('target_project_domain', self.project.domain_id),
+ ('rbac_object', self.rbac_policy.object_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_rbac_policy.assert_called_with(**{
+ 'object_id': self.rbac_policy.object_id,
+ 'object_type': self.rbac_policy.object_type,
+ 'action': self.rbac_policy.action,
+ 'target_tenant': self.rbac_policy.target_tenant,
+ 'tenant_id': self.rbac_policy.tenant_id,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestDeleteNetworkRBAC(TestNetworkRBAC):
+
+ rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=2)
+
+ def setUp(self):
+ super(TestDeleteNetworkRBAC, self).setUp()
+ self.network.delete_rbac_policy = mock.Mock(return_value=None)
+ self.network.find_rbac_policy = (
+ network_fakes.FakeNetworkRBAC.get_network_rbacs(
+ rbac_policies=self.rbac_policies)
+ )
+
+ # Get the command object to test
+ self.cmd = network_rbac.DeleteNetworkRBAC(self.app, self.namespace)
+
+ def test_network_rbac_delete(self):
+ arglist = [
+ self.rbac_policies[0].id,
+ ]
+ verifylist = [
+ ('rbac_policy', [self.rbac_policies[0].id]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_rbac_policy.assert_called_once_with(
+ self.rbac_policies[0].id, ignore_missing=False)
+ self.network.delete_rbac_policy.assert_called_once_with(
+ self.rbac_policies[0])
+ self.assertIsNone(result)
+
+ def test_multi_network_rbacs_delete(self):
+ arglist = []
+ verifylist = []
+
+ for r in self.rbac_policies:
+ arglist.append(r.id)
+ verifylist = [
+ ('rbac_policy', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for r in self.rbac_policies:
+ calls.append(call(r))
+ self.network.delete_rbac_policy.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_network_policies_delete_with_exception(self):
+ arglist = [
+ self.rbac_policies[0].id,
+ 'unexist_rbac_policy',
+ ]
+ verifylist = [
+ ('rbac_policy',
+ [self.rbac_policies[0].id, 'unexist_rbac_policy']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self.rbac_policies[0], exceptions.CommandError]
+ self.network.find_rbac_policy = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 RBAC policies failed to delete.', str(e))
+
+ self.network.find_rbac_policy.assert_any_call(
+ self.rbac_policies[0].id, ignore_missing=False)
+ self.network.find_rbac_policy.assert_any_call(
+ 'unexist_rbac_policy', ignore_missing=False)
+ self.network.delete_rbac_policy.assert_called_once_with(
+ self.rbac_policies[0]
+ )
+
+
+class TestListNetworkRABC(TestNetworkRBAC):
+
+ # The network rbac policies going to be listed up.
+ rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=3)
+
+ columns = (
+ 'ID',
+ 'Object Type',
+ 'Object ID',
+ )
+
+ data = []
+ for r in rbac_policies:
+ data.append((
+ r.id,
+ r.object_type,
+ r.object_id,
+ ))
+
+ def setUp(self):
+ super(TestListNetworkRABC, self).setUp()
+
+ # Get the command object to test
+ self.cmd = network_rbac.ListNetworkRBAC(self.app, self.namespace)
+
+ self.network.rbac_policies = mock.Mock(return_value=self.rbac_policies)
+
+ def test_network_rbac_list(self):
+ arglist = []
+ verifylist = []
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.rbac_policies.assert_called_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestSetNetworkRBAC(TestNetworkRBAC):
+
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac(
+ attrs={'target_tenant': project.id})
+
+ def setUp(self):
+ super(TestSetNetworkRBAC, self).setUp()
+
+ # Get the command object to test
+ self.cmd = network_rbac.SetNetworkRBAC(self.app, self.namespace)
+
+ self.network.find_rbac_policy = mock.Mock(
+ return_value=self.rbac_policy)
+ self.network.update_rbac_policy = mock.Mock(return_value=None)
+ self.projects_mock.get.return_value = self.project
+
+ def test_network_rbac_set_nothing(self):
+ arglist = [
+ self.rbac_policy.id,
+ ]
+ verifylist = [
+ ('rbac_policy', self.rbac_policy.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_rbac_policy.assert_called_once_with(
+ self.rbac_policy.id, ignore_missing=False
+ )
+ attrs = {}
+ self.network.update_rbac_policy.assert_called_once_with(
+ self.rbac_policy, **attrs)
+ self.assertIsNone(result)
+
+ def test_network_rbac_set(self):
+ arglist = [
+ '--target-project', self.project.id,
+ self.rbac_policy.id,
+ ]
+ verifylist = [
+ ('target_project', self.project.id),
+ ('rbac_policy', self.rbac_policy.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_rbac_policy.assert_called_once_with(
+ self.rbac_policy.id, ignore_missing=False
+ )
+ attrs = {'target_tenant': self.project.id}
+ self.network.update_rbac_policy.assert_called_once_with(
+ self.rbac_policy, **attrs)
+ self.assertIsNone(result)
+
+
+class TestShowNetworkRBAC(TestNetworkRBAC):
+
+ rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac()
+
+ columns = (
+ 'action',
+ 'id',
+ 'object_id',
+ 'object_type',
+ 'project_id',
+ 'target_project_id',
+ )
+
+ data = [
+ rbac_policy.action,
+ rbac_policy.id,
+ rbac_policy.object_id,
+ rbac_policy.object_type,
+ rbac_policy.tenant_id,
+ rbac_policy.target_tenant,
+ ]
+
+ def setUp(self):
+ super(TestShowNetworkRBAC, self).setUp()
+
+ # Get the command object to test
+ self.cmd = network_rbac.ShowNetworkRBAC(self.app, self.namespace)
+
+ self.network.find_rbac_policy = mock.Mock(
+ return_value=self.rbac_policy)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_network_rbac_show_all_options(self):
+ arglist = [
+ self.rbac_policy.id,
+ ]
+ verifylist = [
+ ('rbac_policy', self.rbac_policy.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_rbac_policy.assert_called_with(
+ self.rbac_policy.id, ignore_missing=False
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
diff --git a/openstackclient/tests/unit/network/v2/test_network_segment.py b/openstackclient/tests/unit/network/v2/test_network_segment.py
new file mode 100644
index 00000000..b9fce078
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_network_segment.py
@@ -0,0 +1,200 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+
+from osc_lib import exceptions
+
+from openstackclient.network.v2 import network_segment
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestNetworkSegment(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestNetworkSegment, self).setUp()
+
+ # Enable beta commands.
+ self.app.options.os_beta_command = True
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+
+
+class TestListNetworkSegment(TestNetworkSegment):
+ _network = network_fakes.FakeNetwork.create_one_network()
+ _network_segments = \
+ network_fakes.FakeNetworkSegment.create_network_segments(count=3)
+
+ columns = (
+ 'ID',
+ 'Network',
+ 'Network Type',
+ 'Segment',
+ )
+ columns_long = columns + (
+ 'Physical Network',
+ )
+
+ data = []
+ for _network_segment in _network_segments:
+ data.append((
+ _network_segment.id,
+ _network_segment.network_id,
+ _network_segment.network_type,
+ _network_segment.segmentation_id,
+ ))
+
+ data_long = []
+ for _network_segment in _network_segments:
+ data_long.append((
+ _network_segment.id,
+ _network_segment.network_id,
+ _network_segment.network_type,
+ _network_segment.segmentation_id,
+ _network_segment.physical_network,
+ ))
+
+ def setUp(self):
+ super(TestListNetworkSegment, self).setUp()
+
+ # Get the command object to test
+ self.cmd = network_segment.ListNetworkSegment(self.app, self.namespace)
+
+ self.network.find_network = mock.Mock(return_value=self._network)
+ self.network.segments = mock.Mock(return_value=self._network_segments)
+
+ def test_list_no_option(self):
+ arglist = []
+ verifylist = [
+ ('long', False),
+ ('network', None),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.segments.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_list_no_beta_commands(self):
+ self.app.options.os_beta_command = False
+ parsed_args = self.check_parser(self.cmd, [], [])
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ def test_list_long(self):
+ arglist = [
+ '--long',
+ ]
+ verifylist = [
+ ('long', True),
+ ('network', None),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.segments.assert_called_once_with()
+ self.assertEqual(self.columns_long, columns)
+ self.assertEqual(self.data_long, list(data))
+
+ def test_list_network(self):
+ arglist = [
+ '--network',
+ self._network.id,
+ ]
+ verifylist = [
+ ('long', False),
+ ('network', self._network.id)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.segments.assert_called_once_with(
+ **{'network_id': self._network.id}
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestShowNetworkSegment(TestNetworkSegment):
+
+ # The network segment to show.
+ _network_segment = \
+ network_fakes.FakeNetworkSegment.create_one_network_segment()
+
+ columns = (
+ 'id',
+ 'network_id',
+ 'network_type',
+ 'physical_network',
+ 'segmentation_id',
+ )
+
+ data = (
+ _network_segment.id,
+ _network_segment.network_id,
+ _network_segment.network_type,
+ _network_segment.physical_network,
+ _network_segment.segmentation_id,
+ )
+
+ def setUp(self):
+ super(TestShowNetworkSegment, self).setUp()
+
+ self.network.find_segment = mock.Mock(
+ return_value=self._network_segment
+ )
+
+ # Get the command object to test
+ self.cmd = network_segment.ShowNetworkSegment(self.app, self.namespace)
+
+ def test_show_no_options(self):
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, [], [])
+
+ def test_show_no_beta_commands(self):
+ arglist = [
+ self._network_segment.id,
+ ]
+ verifylist = [
+ ('network_segment', self._network_segment.id),
+ ]
+ self.app.options.os_beta_command = False
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ def test_show_all_options(self):
+ arglist = [
+ self._network_segment.id,
+ ]
+ verifylist = [
+ ('network_segment', self._network_segment.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_segment.assert_called_once_with(
+ self._network_segment.id,
+ ignore_missing=False
+ )
+
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_port.py b/openstackclient/tests/unit/network/v2/test_port.py
new file mode 100644
index 00000000..d5d7f330
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_port.py
@@ -0,0 +1,696 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import argparse
+import mock
+
+from mock import call
+from osc_lib import exceptions
+from osc_lib import utils
+
+from openstackclient.network.v2 import port
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestPort(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestPort, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+
+ def _get_common_cols_data(self, fake_port):
+ columns = (
+ 'admin_state_up',
+ 'allowed_address_pairs',
+ 'binding_host_id',
+ 'binding_profile',
+ 'binding_vif_details',
+ 'binding_vif_type',
+ 'binding_vnic_type',
+ 'device_id',
+ 'device_owner',
+ 'dns_assignment',
+ 'dns_name',
+ 'extra_dhcp_opts',
+ 'fixed_ips',
+ 'id',
+ 'mac_address',
+ 'name',
+ 'network_id',
+ 'port_security_enabled',
+ 'project_id',
+ 'security_groups',
+ 'status',
+ )
+
+ data = (
+ port._format_admin_state(fake_port.admin_state_up),
+ utils.format_list_of_dicts(fake_port.allowed_address_pairs),
+ fake_port.binding_host_id,
+ utils.format_dict(fake_port.binding_profile),
+ utils.format_dict(fake_port.binding_vif_details),
+ fake_port.binding_vif_type,
+ fake_port.binding_vnic_type,
+ fake_port.device_id,
+ fake_port.device_owner,
+ utils.format_list_of_dicts(fake_port.dns_assignment),
+ fake_port.dns_name,
+ utils.format_list_of_dicts(fake_port.extra_dhcp_opts),
+ utils.format_list_of_dicts(fake_port.fixed_ips),
+ fake_port.id,
+ fake_port.mac_address,
+ fake_port.name,
+ fake_port.network_id,
+ fake_port.port_security_enabled,
+ fake_port.project_id,
+ utils.format_list(fake_port.security_groups),
+ fake_port.status,
+ )
+
+ return columns, data
+
+
+class TestCreatePort(TestPort):
+
+ _port = network_fakes.FakePort.create_one_port()
+
+ def setUp(self):
+ super(TestCreatePort, self).setUp()
+
+ self.network.create_port = mock.Mock(return_value=self._port)
+ fake_net = network_fakes.FakeNetwork.create_one_network({
+ 'id': self._port.network_id,
+ })
+ self.network.find_network = mock.Mock(return_value=fake_net)
+ self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet()
+ self.network.find_subnet = mock.Mock(return_value=self.fake_subnet)
+ # Get the command object to test
+ self.cmd = port.CreatePort(self.app, self.namespace)
+
+ def test_create_default_options(self):
+ arglist = [
+ '--network', self._port.network_id,
+ 'test-port',
+ ]
+ verifylist = [
+ ('network', self._port.network_id,),
+ ('enable', True),
+ ('name', 'test-port'),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_port.assert_called_once_with(**{
+ 'admin_state_up': True,
+ 'network_id': self._port.network_id,
+ 'name': 'test-port',
+ })
+
+ ref_columns, ref_data = self._get_common_cols_data(self._port)
+ self.assertEqual(ref_columns, columns)
+ self.assertEqual(ref_data, data)
+
+ def test_create_full_options(self):
+ arglist = [
+ '--mac-address', 'aa:aa:aa:aa:aa:aa',
+ '--fixed-ip', 'subnet=%s,ip-address=10.0.0.2'
+ % self.fake_subnet.id,
+ '--device', 'deviceid',
+ '--device-owner', 'fakeowner',
+ '--disable',
+ '--vnic-type', 'macvtap',
+ '--binding-profile', 'foo=bar',
+ '--binding-profile', 'foo2=bar2',
+ '--network', self._port.network_id,
+ 'test-port',
+
+ ]
+ verifylist = [
+ ('mac_address', 'aa:aa:aa:aa:aa:aa'),
+ (
+ 'fixed_ip',
+ [{'subnet': self.fake_subnet.id, 'ip-address': '10.0.0.2'}]
+ ),
+ ('device', 'deviceid'),
+ ('device_owner', 'fakeowner'),
+ ('disable', True),
+ ('vnic_type', 'macvtap'),
+ ('binding_profile', {'foo': 'bar', 'foo2': 'bar2'}),
+ ('network', self._port.network_id),
+ ('name', 'test-port'),
+
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_port.assert_called_once_with(**{
+ 'mac_address': 'aa:aa:aa:aa:aa:aa',
+ 'fixed_ips': [{'subnet_id': self.fake_subnet.id,
+ 'ip_address': '10.0.0.2'}],
+ 'device_id': 'deviceid',
+ 'device_owner': 'fakeowner',
+ 'admin_state_up': False,
+ 'binding:vnic_type': 'macvtap',
+ 'binding:profile': {'foo': 'bar', 'foo2': 'bar2'},
+ 'network_id': self._port.network_id,
+ 'name': 'test-port',
+ })
+
+ ref_columns, ref_data = self._get_common_cols_data(self._port)
+ self.assertEqual(ref_columns, columns)
+ self.assertEqual(ref_data, data)
+
+ def test_create_invalid_json_binding_profile(self):
+ arglist = [
+ '--network', self._port.network_id,
+ '--binding-profile', '{"parent_name":"fake_parent"',
+ 'test-port',
+ ]
+ self.assertRaises(argparse.ArgumentTypeError,
+ self.check_parser,
+ self.cmd,
+ arglist,
+ None)
+
+ def test_create_invalid_key_value_binding_profile(self):
+ arglist = [
+ '--network', self._port.network_id,
+ '--binding-profile', 'key',
+ 'test-port',
+ ]
+ self.assertRaises(argparse.ArgumentTypeError,
+ self.check_parser,
+ self.cmd,
+ arglist,
+ None)
+
+ def test_create_json_binding_profile(self):
+ arglist = [
+ '--network', self._port.network_id,
+ '--binding-profile', '{"parent_name":"fake_parent"}',
+ '--binding-profile', '{"tag":42}',
+ 'test-port',
+ ]
+ verifylist = [
+ ('network', self._port.network_id,),
+ ('enable', True),
+ ('binding_profile', {'parent_name': 'fake_parent', 'tag': 42}),
+ ('name', 'test-port'),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_port.assert_called_once_with(**{
+ 'admin_state_up': True,
+ 'network_id': self._port.network_id,
+ 'binding:profile': {'parent_name': 'fake_parent', 'tag': 42},
+ 'name': 'test-port',
+ })
+
+ ref_columns, ref_data = self._get_common_cols_data(self._port)
+ self.assertEqual(ref_columns, columns)
+ self.assertEqual(ref_data, data)
+
+
+class TestDeletePort(TestPort):
+
+ # Ports to delete.
+ _ports = network_fakes.FakePort.create_ports(count=2)
+
+ def setUp(self):
+ super(TestDeletePort, self).setUp()
+
+ self.network.delete_port = mock.Mock(return_value=None)
+ self.network.find_port = network_fakes.FakePort.get_ports(
+ ports=self._ports)
+ # Get the command object to test
+ self.cmd = port.DeletePort(self.app, self.namespace)
+
+ def test_port_delete(self):
+ arglist = [
+ self._ports[0].name,
+ ]
+ verifylist = [
+ ('port', [self._ports[0].name]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_port.assert_called_once_with(
+ self._ports[0].name, ignore_missing=False)
+ self.network.delete_port.assert_called_once_with(self._ports[0])
+ self.assertIsNone(result)
+
+ def test_multi_ports_delete(self):
+ arglist = []
+ verifylist = []
+
+ for p in self._ports:
+ arglist.append(p.name)
+ verifylist = [
+ ('port', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for p in self._ports:
+ calls.append(call(p))
+ self.network.delete_port.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_ports_delete_with_exception(self):
+ arglist = [
+ self._ports[0].name,
+ 'unexist_port',
+ ]
+ verifylist = [
+ ('port',
+ [self._ports[0].name, 'unexist_port']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self._ports[0], exceptions.CommandError]
+ self.network.find_port = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 ports failed to delete.', str(e))
+
+ self.network.find_port.assert_any_call(
+ self._ports[0].name, ignore_missing=False)
+ self.network.find_port.assert_any_call(
+ 'unexist_port', ignore_missing=False)
+ self.network.delete_port.assert_called_once_with(
+ self._ports[0]
+ )
+
+
+class TestListPort(TestPort):
+
+ _ports = network_fakes.FakePort.create_ports(count=3)
+
+ columns = (
+ 'ID',
+ 'Name',
+ 'MAC Address',
+ 'Fixed IP Addresses',
+ )
+
+ data = []
+ for prt in _ports:
+ data.append((
+ prt.id,
+ prt.name,
+ prt.mac_address,
+ utils.format_list_of_dicts(prt.fixed_ips),
+ ))
+
+ def setUp(self):
+ super(TestListPort, self).setUp()
+
+ # Get the command object to test
+ self.cmd = port.ListPort(self.app, self.namespace)
+ self.network.ports = mock.Mock(return_value=self._ports)
+ fake_router = network_fakes.FakeRouter.create_one_router({
+ 'id': 'fake-router-id',
+ })
+ self.network.find_router = mock.Mock(return_value=fake_router)
+
+ def test_port_list_no_options(self):
+ arglist = []
+ verifylist = []
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.ports.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_port_list_router_opt(self):
+ arglist = [
+ '--router', 'fake-router-name',
+ ]
+
+ verifylist = [
+ ('router', 'fake-router-name')
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.ports.assert_called_once_with(**{
+ 'device_id': 'fake-router-id'
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_port_list_device_owner_opt(self):
+ arglist = [
+ '--device-owner', self._ports[0].device_owner,
+ ]
+
+ verifylist = [
+ ('device_owner', self._ports[0].device_owner)
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.ports.assert_called_once_with(**{
+ 'device_owner': self._ports[0].device_owner
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_port_list_all_opt(self):
+ arglist = [
+ '--device-owner', self._ports[0].device_owner,
+ '--router', 'fake-router-name',
+ ]
+
+ verifylist = [
+ ('device_owner', self._ports[0].device_owner),
+ ('router', 'fake-router-name')
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.ports.assert_called_once_with(**{
+ 'device_owner': self._ports[0].device_owner,
+ 'device_id': 'fake-router-id'
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestSetPort(TestPort):
+
+ _port = network_fakes.FakePort.create_one_port()
+
+ def setUp(self):
+ super(TestSetPort, self).setUp()
+ self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet()
+ self.network.find_subnet = mock.Mock(return_value=self.fake_subnet)
+ self.network.find_port = mock.Mock(return_value=self._port)
+ self.network.update_port = mock.Mock(return_value=None)
+
+ # Get the command object to test
+ self.cmd = port.SetPort(self.app, self.namespace)
+
+ def test_set_fixed_ip(self):
+ arglist = [
+ '--fixed-ip', 'ip-address=10.0.0.11',
+ self._port.name,
+ ]
+ verifylist = [
+ ('fixed_ip', [{'ip-address': '10.0.0.11'}]),
+ ('port', self._port.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'fixed_ips': [{'ip_address': '10.0.0.11'}],
+ }
+ self.network.update_port.assert_called_once_with(self._port, **attrs)
+ self.assertIsNone(result)
+
+ def test_append_fixed_ip(self):
+ _testport = network_fakes.FakePort.create_one_port(
+ {'fixed_ips': [{'ip_address': '0.0.0.1'}]})
+ self.network.find_port = mock.Mock(return_value=_testport)
+ arglist = [
+ '--fixed-ip', 'ip-address=10.0.0.12',
+ _testport.name,
+ ]
+ verifylist = [
+ ('fixed_ip', [{'ip-address': '10.0.0.12'}]),
+ ('port', _testport.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ attrs = {
+ 'fixed_ips': [
+ {'ip_address': '10.0.0.12'}, {'ip_address': '0.0.0.1'}],
+ }
+ self.network.update_port.assert_called_once_with(_testport, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_this(self):
+ arglist = [
+ '--disable',
+ '--no-fixed-ip',
+ '--no-binding-profile',
+ self._port.name,
+ ]
+ verifylist = [
+ ('disable', True),
+ ('no_binding_profile', True),
+ ('no_fixed_ip', True),
+ ('port', self._port.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'admin_state_up': False,
+ 'binding:profile': {},
+ 'fixed_ips': [],
+ }
+ self.network.update_port.assert_called_once_with(self._port, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_that(self):
+ arglist = [
+ '--enable',
+ '--vnic-type', 'macvtap',
+ '--binding-profile', 'foo=bar',
+ '--host', 'binding-host-id-xxxx',
+ '--name', 'newName',
+ self._port.name,
+ ]
+ verifylist = [
+ ('enable', True),
+ ('vnic_type', 'macvtap'),
+ ('binding_profile', {'foo': 'bar'}),
+ ('host', 'binding-host-id-xxxx'),
+ ('name', 'newName'),
+ ('port', self._port.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'admin_state_up': True,
+ 'binding:vnic_type': 'macvtap',
+ 'binding:profile': {'foo': 'bar'},
+ 'binding:host_id': 'binding-host-id-xxxx',
+ 'name': 'newName',
+ }
+ self.network.update_port.assert_called_once_with(self._port, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_nothing(self):
+ arglist = [
+ self._port.name,
+ ]
+ verifylist = [
+ ('port', self._port.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {}
+ self.network.update_port.assert_called_once_with(self._port, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_invalid_json_binding_profile(self):
+ arglist = [
+ '--binding-profile', '{"parent_name"}',
+ 'test-port',
+ ]
+ self.assertRaises(argparse.ArgumentTypeError,
+ self.check_parser,
+ self.cmd,
+ arglist,
+ None)
+
+ def test_set_invalid_key_value_binding_profile(self):
+ arglist = [
+ '--binding-profile', 'key',
+ 'test-port',
+ ]
+ self.assertRaises(argparse.ArgumentTypeError,
+ self.check_parser,
+ self.cmd,
+ arglist,
+ None)
+
+ def test_set_mixed_binding_profile(self):
+ arglist = [
+ '--binding-profile', 'foo=bar',
+ '--binding-profile', '{"foo2": "bar2"}',
+ self._port.name,
+ ]
+ verifylist = [
+ ('binding_profile', {'foo': 'bar', 'foo2': 'bar2'}),
+ ('port', self._port.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'binding:profile': {'foo': 'bar', 'foo2': 'bar2'},
+ }
+ self.network.update_port.assert_called_once_with(self._port, **attrs)
+ self.assertIsNone(result)
+
+
+class TestShowPort(TestPort):
+
+ # The port to show.
+ _port = network_fakes.FakePort.create_one_port()
+
+ def setUp(self):
+ super(TestShowPort, self).setUp()
+
+ self.network.find_port = mock.Mock(return_value=self._port)
+
+ # Get the command object to test
+ self.cmd = port.ShowPort(self.app, self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self._port.name,
+ ]
+ verifylist = [
+ ('port', self._port.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_port.assert_called_once_with(
+ self._port.name, ignore_missing=False)
+
+ ref_columns, ref_data = self._get_common_cols_data(self._port)
+ self.assertEqual(ref_columns, columns)
+ self.assertEqual(ref_data, data)
+
+
+class TestUnsetPort(TestPort):
+
+ def setUp(self):
+ super(TestUnsetPort, self).setUp()
+ self._testport = network_fakes.FakePort.create_one_port(
+ {'fixed_ips': [{'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152',
+ 'ip_address': '0.0.0.1'},
+ {'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152',
+ 'ip_address': '1.0.0.0'}],
+ 'binding:profile': {'batman': 'Joker', 'Superman': 'LexLuthor'}})
+ self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet(
+ {'id': '042eb10a-3a18-4658-ab-cf47c8d03152'})
+ self.network.find_subnet = mock.Mock(return_value=self.fake_subnet)
+ self.network.find_port = mock.Mock(return_value=self._testport)
+ self.network.update_port = mock.Mock(return_value=None)
+ # Get the command object to test
+ self.cmd = port.UnsetPort(self.app, self.namespace)
+
+ def test_unset_port_parameters(self):
+ arglist = [
+ '--fixed-ip',
+ 'subnet=042eb10a-3a18-4658-ab-cf47c8d03152,ip-address=1.0.0.0',
+ '--binding-profile', 'Superman',
+ self._testport.name,
+ ]
+ verifylist = [
+ ('fixed_ip', [{
+ 'subnet': '042eb10a-3a18-4658-ab-cf47c8d03152',
+ 'ip-address': '1.0.0.0'}]),
+ ('binding_profile', ['Superman']),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'fixed_ips': [{
+ 'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152',
+ 'ip_address': '0.0.0.1'}],
+ 'binding:profile': {'batman': 'Joker'}
+ }
+ self.network.update_port.assert_called_once_with(
+ self._testport, **attrs)
+ self.assertIsNone(result)
+
+ def test_unset_port_fixed_ip_not_existent(self):
+ arglist = [
+ '--fixed-ip', 'ip-address=1.0.0.1',
+ '--binding-profile', 'Superman',
+ self._testport.name,
+ ]
+ verifylist = [
+ ('fixed_ip', [{'ip-address': '1.0.0.1'}]),
+ ('binding_profile', ['Superman']),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError,
+ self.cmd.take_action,
+ parsed_args)
+
+ def test_unset_port_binding_profile_not_existent(self):
+ arglist = [
+ '--fixed-ip', 'ip-address=1.0.0.0',
+ '--binding-profile', 'Neo',
+ self._testport.name,
+ ]
+ verifylist = [
+ ('fixed_ip', [{'ip-address': '1.0.0.0'}]),
+ ('binding_profile', ['Neo']),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError,
+ self.cmd.take_action,
+ parsed_args)
diff --git a/openstackclient/tests/unit/network/v2/test_router.py b/openstackclient/tests/unit/network/v2/test_router.py
new file mode 100644
index 00000000..26fe655e
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_router.py
@@ -0,0 +1,751 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+from mock import call
+
+from osc_lib import exceptions
+from osc_lib import utils as osc_utils
+
+from openstackclient.network.v2 import router
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestRouter(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestRouter, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+
+
+class TestAddPortToRouter(TestRouter):
+ '''Add port to Router '''
+
+ _port = network_fakes.FakePort.create_one_port()
+ _router = network_fakes.FakeRouter.create_one_router(
+ attrs={'port': _port.id})
+
+ def setUp(self):
+ super(TestAddPortToRouter, self).setUp()
+ self.network.router_add_interface = mock.Mock()
+ self.cmd = router.AddPortToRouter(self.app, self.namespace)
+ self.network.find_router = mock.Mock(return_value=self._router)
+ self.network.find_port = mock.Mock(return_value=self._port)
+
+ def test_add_port_no_option(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_add_port_required_options(self):
+ arglist = [
+ self._router.id,
+ self._router.port,
+ ]
+ verifylist = [
+ ('router', self._router.id),
+ ('port', self._router.port),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.router_add_interface.assert_called_with(self._router, **{
+ 'port_id': self._router.port,
+ })
+ self.assertIsNone(result)
+
+
+class TestAddSubnetToRouter(TestRouter):
+ '''Add subnet to Router '''
+
+ _subnet = network_fakes.FakeSubnet.create_one_subnet()
+ _router = network_fakes.FakeRouter.create_one_router(
+ attrs={'subnet': _subnet.id})
+
+ def setUp(self):
+ super(TestAddSubnetToRouter, self).setUp()
+ self.network.router_add_interface = mock.Mock()
+ self.cmd = router.AddSubnetToRouter(self.app, self.namespace)
+ self.network.find_router = mock.Mock(return_value=self._router)
+ self.network.find_subnet = mock.Mock(return_value=self._subnet)
+
+ def test_add_subnet_no_option(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_add_subnet_required_options(self):
+ arglist = [
+ self._router.id,
+ self._router.subnet,
+ ]
+ verifylist = [
+ ('router', self._router.id),
+ ('subnet', self._router.subnet),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.router_add_interface.assert_called_with(
+ self._router, **{'subnet_id': self._router.subnet})
+
+ self.assertIsNone(result)
+
+
+class TestCreateRouter(TestRouter):
+
+ # The new router created.
+ new_router = network_fakes.FakeRouter.create_one_router()
+
+ columns = (
+ 'admin_state_up',
+ 'availability_zone_hints',
+ 'availability_zones',
+ 'distributed',
+ 'external_gateway_info',
+ 'ha',
+ 'id',
+ 'name',
+ 'project_id',
+ 'routes',
+ 'status',
+ )
+ data = (
+ router._format_admin_state(new_router.admin_state_up),
+ osc_utils.format_list(new_router.availability_zone_hints),
+ osc_utils.format_list(new_router.availability_zones),
+ new_router.distributed,
+ router._format_external_gateway_info(new_router.external_gateway_info),
+ new_router.ha,
+ new_router.id,
+ new_router.name,
+ new_router.tenant_id,
+ router._format_routes(new_router.routes),
+ new_router.status,
+ )
+
+ def setUp(self):
+ super(TestCreateRouter, self).setUp()
+
+ self.network.create_router = mock.Mock(return_value=self.new_router)
+
+ # Get the command object to test
+ self.cmd = router.CreateRouter(self.app, self.namespace)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ self.new_router.name,
+ ]
+ verifylist = [
+ ('name', self.new_router.name),
+ ('enable', True),
+ ('distributed', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_router.assert_called_once_with(**{
+ 'admin_state_up': True,
+ 'name': self.new_router.name,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_with_AZ_hints(self):
+ arglist = [
+ self.new_router.name,
+ '--availability-zone-hint', 'fake-az',
+ '--availability-zone-hint', 'fake-az2',
+ ]
+ verifylist = [
+ ('name', self.new_router.name),
+ ('availability_zone_hints', ['fake-az', 'fake-az2']),
+ ('enable', True),
+ ('distributed', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+ self.network.create_router.assert_called_once_with(**{
+ 'admin_state_up': True,
+ 'name': self.new_router.name,
+ 'availability_zone_hints': ['fake-az', 'fake-az2'],
+ })
+
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteRouter(TestRouter):
+
+ # The routers to delete.
+ _routers = network_fakes.FakeRouter.create_routers(count=2)
+
+ def setUp(self):
+ super(TestDeleteRouter, self).setUp()
+
+ self.network.delete_router = mock.Mock(return_value=None)
+
+ self.network.find_router = (
+ network_fakes.FakeRouter.get_routers(self._routers))
+
+ # Get the command object to test
+ self.cmd = router.DeleteRouter(self.app, self.namespace)
+
+ def test_router_delete(self):
+ arglist = [
+ self._routers[0].name,
+ ]
+ verifylist = [
+ ('router', [self._routers[0].name]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.delete_router.assert_called_once_with(self._routers[0])
+ self.assertIsNone(result)
+
+ def test_multi_routers_delete(self):
+ arglist = []
+ verifylist = []
+
+ for r in self._routers:
+ arglist.append(r.name)
+ verifylist = [
+ ('router', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for r in self._routers:
+ calls.append(call(r))
+ self.network.delete_router.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_routers_delete_with_exception(self):
+ arglist = [
+ self._routers[0].name,
+ 'unexist_router',
+ ]
+ verifylist = [
+ ('router',
+ [self._routers[0].name, 'unexist_router']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self._routers[0], exceptions.CommandError]
+ self.network.find_router = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 routers failed to delete.', str(e))
+
+ self.network.find_router.assert_any_call(
+ self._routers[0].name, ignore_missing=False)
+ self.network.find_router.assert_any_call(
+ 'unexist_router', ignore_missing=False)
+ self.network.delete_router.assert_called_once_with(
+ self._routers[0]
+ )
+
+
+class TestListRouter(TestRouter):
+
+ # The routers going to be listed up.
+ routers = network_fakes.FakeRouter.create_routers(count=3)
+
+ columns = (
+ 'ID',
+ 'Name',
+ 'Status',
+ 'State',
+ 'Distributed',
+ 'HA',
+ 'Project',
+ )
+ columns_long = columns + (
+ 'Routes',
+ 'External gateway info',
+ 'Availability zones'
+ )
+
+ data = []
+ for r in routers:
+ data.append((
+ r.id,
+ r.name,
+ r.status,
+ router._format_admin_state(r.admin_state_up),
+ r.distributed,
+ r.ha,
+ r.tenant_id,
+ ))
+ data_long = []
+ for i in range(0, len(routers)):
+ r = routers[i]
+ data_long.append(
+ data[i] + (
+ router._format_routes(r.routes),
+ router._format_external_gateway_info(r.external_gateway_info),
+ osc_utils.format_list(r.availability_zones),
+ )
+ )
+
+ def setUp(self):
+ super(TestListRouter, self).setUp()
+
+ # Get the command object to test
+ self.cmd = router.ListRouter(self.app, self.namespace)
+
+ self.network.routers = mock.Mock(return_value=self.routers)
+
+ def test_router_list_no_options(self):
+ arglist = []
+ verifylist = [
+ ('long', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # In base command class Lister in cliff, abstract method take_action()
+ # returns a tuple containing the column names and an iterable
+ # containing the data to be listed.
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.routers.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_router_list_long(self):
+ arglist = [
+ '--long',
+ ]
+ verifylist = [
+ ('long', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # In base command class Lister in cliff, abstract method take_action()
+ # returns a tuple containing the column names and an iterable
+ # containing the data to be listed.
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.routers.assert_called_once_with()
+ self.assertEqual(self.columns_long, columns)
+ self.assertEqual(self.data_long, list(data))
+
+
+class TestRemovePortFromRouter(TestRouter):
+ '''Remove port from a Router '''
+
+ _port = network_fakes.FakePort.create_one_port()
+ _router = network_fakes.FakeRouter.create_one_router(
+ attrs={'port': _port.id})
+
+ def setUp(self):
+ super(TestRemovePortFromRouter, self).setUp()
+ self.network.router_remove_interface = mock.Mock()
+ self.cmd = router.RemovePortFromRouter(self.app, self.namespace)
+ self.network.find_router = mock.Mock(return_value=self._router)
+ self.network.find_port = mock.Mock(return_value=self._port)
+
+ def test_remove_port_no_option(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_remove_port_required_options(self):
+ arglist = [
+ self._router.id,
+ self._router.port,
+ ]
+ verifylist = [
+ ('router', self._router.id),
+ ('port', self._router.port),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.router_remove_interface.assert_called_with(
+ self._router, **{'port_id': self._router.port})
+ self.assertIsNone(result)
+
+
+class TestRemoveSubnetFromRouter(TestRouter):
+ '''Remove subnet from Router '''
+
+ _subnet = network_fakes.FakeSubnet.create_one_subnet()
+ _router = network_fakes.FakeRouter.create_one_router(
+ attrs={'subnet': _subnet.id})
+
+ def setUp(self):
+ super(TestRemoveSubnetFromRouter, self).setUp()
+ self.network.router_remove_interface = mock.Mock()
+ self.cmd = router.RemoveSubnetFromRouter(self.app, self.namespace)
+ self.network.find_router = mock.Mock(return_value=self._router)
+ self.network.find_subnet = mock.Mock(return_value=self._subnet)
+
+ def test_remove_subnet_no_option(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_remove_subnet_required_options(self):
+ arglist = [
+ self._router.id,
+ self._router.subnet,
+ ]
+ verifylist = [
+ ('subnet', self._router.subnet),
+ ('router', self._router.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.router_remove_interface.assert_called_with(
+ self._router, **{'subnet_id': self._router.subnet})
+ self.assertIsNone(result)
+
+
+class TestSetRouter(TestRouter):
+
+ # The router to set.
+ _default_route = {'destination': '10.20.20.0/24', 'nexthop': '10.20.30.1'}
+ _router = network_fakes.FakeRouter.create_one_router(
+ attrs={'routes': [_default_route]}
+ )
+
+ def setUp(self):
+ super(TestSetRouter, self).setUp()
+
+ self.network.update_router = mock.Mock(return_value=None)
+
+ self.network.find_router = mock.Mock(return_value=self._router)
+
+ # Get the command object to test
+ self.cmd = router.SetRouter(self.app, self.namespace)
+
+ def test_set_this(self):
+ arglist = [
+ self._router.name,
+ '--enable',
+ '--distributed',
+ '--name', 'noob',
+ ]
+ verifylist = [
+ ('router', self._router.name),
+ ('enable', True),
+ ('distributed', True),
+ ('name', 'noob'),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'admin_state_up': True,
+ 'distributed': True,
+ 'name': 'noob',
+ }
+ self.network.update_router.assert_called_once_with(
+ self._router, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_that(self):
+ arglist = [
+ self._router.name,
+ '--disable',
+ '--centralized',
+ ]
+ verifylist = [
+ ('router', self._router.name),
+ ('disable', True),
+ ('centralized', True),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'admin_state_up': False,
+ 'distributed': False,
+ }
+ self.network.update_router.assert_called_once_with(
+ self._router, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_distributed_centralized(self):
+ arglist = [
+ self._router.name,
+ '--distributed',
+ '--centralized',
+ ]
+ verifylist = [
+ ('router', self._router.name),
+ ('distributed', True),
+ ('distributed', False),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_set_route(self):
+ arglist = [
+ self._router.name,
+ '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1',
+ ]
+ verifylist = [
+ ('router', self._router.name),
+ ('routes', [{'destination': '10.20.30.0/24',
+ 'gateway': '10.20.30.1'}]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'routes': self._router.routes + [{'destination': '10.20.30.0/24',
+ 'nexthop': '10.20.30.1'}],
+ }
+ self.network.update_router.assert_called_once_with(
+ self._router, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_no_route(self):
+ arglist = [
+ self._router.name,
+ '--no-route',
+ ]
+ verifylist = [
+ ('router', self._router.name),
+ ('no_route', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'routes': [],
+ }
+ self.network.update_router.assert_called_once_with(
+ self._router, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_route_no_route(self):
+ arglist = [
+ self._router.name,
+ '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1',
+ '--no-route',
+ ]
+ verifylist = [
+ ('router', self._router.name),
+ ('routes', [{'destination': '10.20.30.0/24',
+ 'gateway': '10.20.30.1'}]),
+ ('no_route', True),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_set_clear_routes(self):
+ arglist = [
+ self._router.name,
+ '--clear-routes',
+ ]
+ verifylist = [
+ ('router', self._router.name),
+ ('clear_routes', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'routes': [],
+ }
+ self.network.update_router.assert_called_once_with(
+ self._router, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_route_clear_routes(self):
+ arglist = [
+ self._router.name,
+ '--route', 'destination=10.20.30.0/24,gateway=10.20.30.1',
+ '--clear-routes',
+ ]
+ verifylist = [
+ ('router', self._router.name),
+ ('routes', [{'destination': '10.20.30.0/24',
+ 'gateway': '10.20.30.1'}]),
+ ('clear_routes', True),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_set_nothing(self):
+ arglist = [
+ self._router.name,
+ ]
+ verifylist = [
+ ('router', self._router.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {}
+ self.network.update_router.assert_called_once_with(
+ self._router, **attrs)
+ self.assertIsNone(result)
+
+
+class TestShowRouter(TestRouter):
+
+ # The router to set.
+ _router = network_fakes.FakeRouter.create_one_router()
+
+ columns = (
+ 'admin_state_up',
+ 'availability_zone_hints',
+ 'availability_zones',
+ 'distributed',
+ 'external_gateway_info',
+ 'ha',
+ 'id',
+ 'name',
+ 'project_id',
+ 'routes',
+ 'status',
+ )
+ data = (
+ router._format_admin_state(_router.admin_state_up),
+ osc_utils.format_list(_router.availability_zone_hints),
+ osc_utils.format_list(_router.availability_zones),
+ _router.distributed,
+ router._format_external_gateway_info(_router.external_gateway_info),
+ _router.ha,
+ _router.id,
+ _router.name,
+ _router.tenant_id,
+ router._format_routes(_router.routes),
+ _router.status,
+ )
+
+ def setUp(self):
+ super(TestShowRouter, self).setUp()
+
+ self.network.find_router = mock.Mock(return_value=self._router)
+
+ # Get the command object to test
+ self.cmd = router.ShowRouter(self.app, self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self._router.name,
+ ]
+ verifylist = [
+ ('router', self._router.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_router.assert_called_once_with(
+ self._router.name, ignore_missing=False)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestUnsetRouter(TestRouter):
+
+ def setUp(self):
+ super(TestUnsetRouter, self).setUp()
+ self._testrouter = network_fakes.FakeRouter.create_one_router(
+ {'routes': [{"destination": "192.168.101.1/24",
+ "gateway": "172.24.4.3"},
+ {"destination": "192.168.101.2/24",
+ "gateway": "172.24.4.3"}], })
+ self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet()
+ self.network.find_router = mock.Mock(return_value=self._testrouter)
+ self.network.update_router = mock.Mock(return_value=None)
+ # Get the command object to test
+ self.cmd = router.UnsetRouter(self.app, self.namespace)
+
+ def test_unset_router_params(self):
+ arglist = [
+ '--route', 'destination=192.168.101.1/24,gateway=172.24.4.3',
+ self._testrouter.name,
+ ]
+ verifylist = [
+ ('routes', [
+ {"destination": "192.168.101.1/24", "gateway": "172.24.4.3"}]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'routes': [{"destination": "192.168.101.2/24",
+ "nexthop": "172.24.4.3"}],
+ }
+ self.network.update_router.assert_called_once_with(
+ self._testrouter, **attrs)
+ self.assertIsNone(result)
+
+ def test_unset_router_wrong_routes(self):
+ arglist = [
+ '--route', 'destination=192.168.101.1/24,gateway=172.24.4.2',
+ self._testrouter.name,
+ ]
+ verifylist = [
+ ('routes', [
+ {"destination": "192.168.101.1/24", "gateway": "172.24.4.2"}]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError,
+ self.cmd.take_action, parsed_args)
diff --git a/openstackclient/tests/unit/network/v2/test_security_group.py b/openstackclient/tests/unit/network/v2/test_security_group.py
new file mode 100644
index 00000000..4c5b2972
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_security_group.py
@@ -0,0 +1,770 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+from mock import call
+
+from osc_lib import exceptions
+
+from openstackclient.network.v2 import security_group
+from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestSecurityGroupNetwork(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestSecurityGroupNetwork, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+ # Get a shortcut to the ProjectManager Mock
+ self.projects_mock = self.app.client_manager.identity.projects
+ # Get a shortcut to the DomainManager Mock
+ self.domains_mock = self.app.client_manager.identity.domains
+
+
+class TestSecurityGroupCompute(compute_fakes.TestComputev2):
+
+ def setUp(self):
+ super(TestSecurityGroupCompute, self).setUp()
+
+ # Get a shortcut to the compute client
+ self.compute = self.app.client_manager.compute
+
+
+class TestCreateSecurityGroupNetwork(TestSecurityGroupNetwork):
+
+ project = identity_fakes.FakeProject.create_one_project()
+ domain = identity_fakes.FakeDomain.create_one_domain()
+ # The security group to be created.
+ _security_group = \
+ network_fakes.FakeSecurityGroup.create_one_security_group()
+
+ columns = (
+ 'description',
+ 'id',
+ 'name',
+ 'project_id',
+ 'rules',
+ )
+
+ data = (
+ _security_group.description,
+ _security_group.id,
+ _security_group.name,
+ _security_group.project_id,
+ '',
+ )
+
+ def setUp(self):
+ super(TestCreateSecurityGroupNetwork, self).setUp()
+
+ self.network.create_security_group = mock.Mock(
+ return_value=self._security_group)
+
+ self.projects_mock.get.return_value = self.project
+ self.domains_mock.get.return_value = self.domain
+
+ # Get the command object to test
+ self.cmd = security_group.CreateSecurityGroup(self.app, self.namespace)
+
+ def test_create_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_create_min_options(self):
+ arglist = [
+ self._security_group.name,
+ ]
+ verifylist = [
+ ('name', self._security_group.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group.assert_called_once_with(**{
+ 'description': self._security_group.name,
+ 'name': self._security_group.name,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_all_options(self):
+ arglist = [
+ '--description', self._security_group.description,
+ '--project', self.project.name,
+ '--project-domain', self.domain.name,
+ self._security_group.name,
+ ]
+ verifylist = [
+ ('description', self._security_group.description),
+ ('name', self._security_group.name),
+ ('project', self.project.name),
+ ('project_domain', self.domain.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group.assert_called_once_with(**{
+ 'description': self._security_group.description,
+ 'name': self._security_group.name,
+ 'tenant_id': self.project.id,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
+
+ project = identity_fakes.FakeProject.create_one_project()
+ domain = identity_fakes.FakeDomain.create_one_domain()
+ # The security group to be shown.
+ _security_group = \
+ compute_fakes.FakeSecurityGroup.create_one_security_group()
+
+ columns = (
+ 'description',
+ 'id',
+ 'name',
+ 'project_id',
+ 'rules',
+ )
+
+ data = (
+ _security_group.description,
+ _security_group.id,
+ _security_group.name,
+ _security_group.tenant_id,
+ '',
+ )
+
+ def setUp(self):
+ super(TestCreateSecurityGroupCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.security_groups.create.return_value = self._security_group
+
+ # Get the command object to test
+ self.cmd = security_group.CreateSecurityGroup(self.app, None)
+
+ def test_create_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_create_network_options(self):
+ arglist = [
+ '--project', self.project.name,
+ '--project-domain', self.domain.name,
+ self._security_group.name,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_create_min_options(self):
+ arglist = [
+ self._security_group.name,
+ ]
+ verifylist = [
+ ('name', self._security_group.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_groups.create.assert_called_once_with(
+ self._security_group.name,
+ self._security_group.name)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_all_options(self):
+ arglist = [
+ '--description', self._security_group.description,
+ self._security_group.name,
+ ]
+ verifylist = [
+ ('description', self._security_group.description),
+ ('name', self._security_group.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_groups.create.assert_called_once_with(
+ self._security_group.name,
+ self._security_group.description)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork):
+
+ # The security groups to be deleted.
+ _security_groups = \
+ network_fakes.FakeSecurityGroup.create_security_groups()
+
+ def setUp(self):
+ super(TestDeleteSecurityGroupNetwork, self).setUp()
+
+ self.network.delete_security_group = mock.Mock(return_value=None)
+
+ self.network.find_security_group = (
+ network_fakes.FakeSecurityGroup.get_security_groups(
+ self._security_groups)
+ )
+
+ # Get the command object to test
+ self.cmd = security_group.DeleteSecurityGroup(self.app, self.namespace)
+
+ def test_security_group_delete(self):
+ arglist = [
+ self._security_groups[0].name,
+ ]
+ verifylist = [
+ ('group', [self._security_groups[0].name]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.delete_security_group.assert_called_once_with(
+ self._security_groups[0])
+ self.assertIsNone(result)
+
+ def test_multi_security_groups_delete(self):
+ arglist = []
+ verifylist = []
+
+ for s in self._security_groups:
+ arglist.append(s.name)
+ verifylist = [
+ ('group', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for s in self._security_groups:
+ calls.append(call(s))
+ self.network.delete_security_group.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_security_groups_delete_with_exception(self):
+ arglist = [
+ self._security_groups[0].name,
+ 'unexist_security_group',
+ ]
+ verifylist = [
+ ('group',
+ [self._security_groups[0].name, 'unexist_security_group']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self._security_groups[0], exceptions.CommandError]
+ self.network.find_security_group = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 groups failed to delete.', str(e))
+
+ self.network.find_security_group.assert_any_call(
+ self._security_groups[0].name, ignore_missing=False)
+ self.network.find_security_group.assert_any_call(
+ 'unexist_security_group', ignore_missing=False)
+ self.network.delete_security_group.assert_called_once_with(
+ self._security_groups[0]
+ )
+
+
+class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
+
+ # The security groups to be deleted.
+ _security_groups = \
+ compute_fakes.FakeSecurityGroup.create_security_groups()
+
+ def setUp(self):
+ super(TestDeleteSecurityGroupCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.security_groups.delete = mock.Mock(return_value=None)
+
+ self.compute.security_groups.get = (
+ compute_fakes.FakeSecurityGroup.get_security_groups(
+ self._security_groups)
+ )
+
+ # Get the command object to test
+ self.cmd = security_group.DeleteSecurityGroup(self.app, None)
+
+ def test_security_group_delete(self):
+ arglist = [
+ self._security_groups[0].id,
+ ]
+ verifylist = [
+ ('group', [self._security_groups[0].id]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.compute.security_groups.delete.assert_called_once_with(
+ self._security_groups[0].id)
+ self.assertIsNone(result)
+
+ def test_multi_security_groups_delete(self):
+ arglist = []
+ verifylist = []
+
+ for s in self._security_groups:
+ arglist.append(s.id)
+ verifylist = [
+ ('group', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for s in self._security_groups:
+ calls.append(call(s.id))
+ self.compute.security_groups.delete.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_security_groups_delete_with_exception(self):
+ arglist = [
+ self._security_groups[0].id,
+ 'unexist_security_group',
+ ]
+ verifylist = [
+ ('group',
+ [self._security_groups[0].id, 'unexist_security_group']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self._security_groups[0], exceptions.CommandError]
+ self.compute.security_groups.get = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+ self.compute.security_groups.find.side_effect = (
+ exceptions.NotFound(None))
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 groups failed to delete.', str(e))
+
+ self.compute.security_groups.get.assert_any_call(
+ self._security_groups[0].id)
+ self.compute.security_groups.get.assert_any_call(
+ 'unexist_security_group')
+ self.compute.security_groups.delete.assert_called_once_with(
+ self._security_groups[0].id
+ )
+
+
+class TestListSecurityGroupNetwork(TestSecurityGroupNetwork):
+
+ # The security group to be listed.
+ _security_groups = \
+ network_fakes.FakeSecurityGroup.create_security_groups(count=3)
+
+ columns = (
+ 'ID',
+ 'Name',
+ 'Description',
+ 'Project',
+ )
+
+ data = []
+ for grp in _security_groups:
+ data.append((
+ grp.id,
+ grp.name,
+ grp.description,
+ grp.tenant_id,
+ ))
+
+ def setUp(self):
+ super(TestListSecurityGroupNetwork, self).setUp()
+
+ self.network.security_groups = mock.Mock(
+ return_value=self._security_groups)
+
+ # Get the command object to test
+ self.cmd = security_group.ListSecurityGroup(self.app, self.namespace)
+
+ def test_security_group_list_no_options(self):
+ arglist = []
+ verifylist = [
+ ('all_projects', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.security_groups.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_security_group_list_all_projects(self):
+ arglist = [
+ '--all-projects',
+ ]
+ verifylist = [
+ ('all_projects', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.security_groups.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestListSecurityGroupCompute(TestSecurityGroupCompute):
+
+ # The security group to be listed.
+ _security_groups = \
+ compute_fakes.FakeSecurityGroup.create_security_groups(count=3)
+
+ columns = (
+ 'ID',
+ 'Name',
+ 'Description',
+ )
+ columns_all_projects = (
+ 'ID',
+ 'Name',
+ 'Description',
+ 'Project',
+ )
+
+ data = []
+ for grp in _security_groups:
+ data.append((
+ grp.id,
+ grp.name,
+ grp.description,
+ ))
+ data_all_projects = []
+ for grp in _security_groups:
+ data_all_projects.append((
+ grp.id,
+ grp.name,
+ grp.description,
+ grp.tenant_id,
+ ))
+
+ def setUp(self):
+ super(TestListSecurityGroupCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+ self.compute.security_groups.list.return_value = self._security_groups
+
+ # Get the command object to test
+ self.cmd = security_group.ListSecurityGroup(self.app, None)
+
+ def test_security_group_list_no_options(self):
+ arglist = []
+ verifylist = [
+ ('all_projects', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ kwargs = {'search_opts': {'all_tenants': False}}
+ self.compute.security_groups.list.assert_called_once_with(**kwargs)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_security_group_list_all_projects(self):
+ arglist = [
+ '--all-projects',
+ ]
+ verifylist = [
+ ('all_projects', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ kwargs = {'search_opts': {'all_tenants': True}}
+ self.compute.security_groups.list.assert_called_once_with(**kwargs)
+ self.assertEqual(self.columns_all_projects, columns)
+ self.assertEqual(self.data_all_projects, list(data))
+
+
+class TestSetSecurityGroupNetwork(TestSecurityGroupNetwork):
+
+ # The security group to be set.
+ _security_group = \
+ network_fakes.FakeSecurityGroup.create_one_security_group()
+
+ def setUp(self):
+ super(TestSetSecurityGroupNetwork, self).setUp()
+
+ self.network.update_security_group = mock.Mock(return_value=None)
+
+ self.network.find_security_group = mock.Mock(
+ return_value=self._security_group)
+
+ # Get the command object to test
+ self.cmd = security_group.SetSecurityGroup(self.app, self.namespace)
+
+ def test_set_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_set_no_updates(self):
+ arglist = [
+ self._security_group.name,
+ ]
+ verifylist = [
+ ('group', self._security_group.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.update_security_group.assert_called_once_with(
+ self._security_group,
+ **{}
+ )
+ self.assertIsNone(result)
+
+ def test_set_all_options(self):
+ new_name = 'new-' + self._security_group.name
+ new_description = 'new-' + self._security_group.description
+ arglist = [
+ '--name', new_name,
+ '--description', new_description,
+ self._security_group.name,
+ ]
+ verifylist = [
+ ('description', new_description),
+ ('group', self._security_group.name),
+ ('name', new_name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'description': new_description,
+ 'name': new_name,
+ }
+ self.network.update_security_group.assert_called_once_with(
+ self._security_group,
+ **attrs
+ )
+ self.assertIsNone(result)
+
+
+class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
+
+ # The security group to be set.
+ _security_group = \
+ compute_fakes.FakeSecurityGroup.create_one_security_group()
+
+ def setUp(self):
+ super(TestSetSecurityGroupCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.security_groups.update = mock.Mock(return_value=None)
+
+ self.compute.security_groups.get = mock.Mock(
+ return_value=self._security_group)
+
+ # Get the command object to test
+ self.cmd = security_group.SetSecurityGroup(self.app, None)
+
+ def test_set_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_set_no_updates(self):
+ arglist = [
+ self._security_group.name,
+ ]
+ verifylist = [
+ ('group', self._security_group.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.compute.security_groups.update.assert_called_once_with(
+ self._security_group,
+ self._security_group.name,
+ self._security_group.description
+ )
+ self.assertIsNone(result)
+
+ def test_set_all_options(self):
+ new_name = 'new-' + self._security_group.name
+ new_description = 'new-' + self._security_group.description
+ arglist = [
+ '--name', new_name,
+ '--description', new_description,
+ self._security_group.name,
+ ]
+ verifylist = [
+ ('description', new_description),
+ ('group', self._security_group.name),
+ ('name', new_name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.compute.security_groups.update.assert_called_once_with(
+ self._security_group,
+ new_name,
+ new_description
+ )
+ self.assertIsNone(result)
+
+
+class TestShowSecurityGroupNetwork(TestSecurityGroupNetwork):
+
+ # The security group rule to be shown with the group.
+ _security_group_rule = \
+ network_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
+
+ # The security group to be shown.
+ _security_group = \
+ network_fakes.FakeSecurityGroup.create_one_security_group(
+ attrs={'security_group_rules': [_security_group_rule._info]}
+ )
+
+ columns = (
+ 'description',
+ 'id',
+ 'name',
+ 'project_id',
+ 'rules',
+ )
+
+ data = (
+ _security_group.description,
+ _security_group.id,
+ _security_group.name,
+ _security_group.project_id,
+ security_group._format_network_security_group_rules(
+ [_security_group_rule._info]),
+ )
+
+ def setUp(self):
+ super(TestShowSecurityGroupNetwork, self).setUp()
+
+ self.network.find_security_group = mock.Mock(
+ return_value=self._security_group)
+
+ # Get the command object to test
+ self.cmd = security_group.ShowSecurityGroup(self.app, self.namespace)
+
+ def test_show_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_show_all_options(self):
+ arglist = [
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_security_group.assert_called_once_with(
+ self._security_group.id, ignore_missing=False)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
+
+ # The security group rule to be shown with the group.
+ _security_group_rule = \
+ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
+
+ # The security group to be shown.
+ _security_group = \
+ compute_fakes.FakeSecurityGroup.create_one_security_group(
+ attrs={'rules': [_security_group_rule._info]}
+ )
+
+ columns = (
+ 'description',
+ 'id',
+ 'name',
+ 'project_id',
+ 'rules',
+ )
+
+ data = (
+ _security_group.description,
+ _security_group.id,
+ _security_group.name,
+ _security_group.tenant_id,
+ security_group._format_compute_security_group_rules(
+ [_security_group_rule._info]),
+ )
+
+ def setUp(self):
+ super(TestShowSecurityGroupCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.security_groups.get.return_value = self._security_group
+
+ # Get the command object to test
+ self.cmd = security_group.ShowSecurityGroup(self.app, None)
+
+ def test_show_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_show_all_options(self):
+ arglist = [
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_groups.get.assert_called_once_with(
+ self._security_group.id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule.py b/openstackclient/tests/unit/network/v2/test_security_group_rule.py
new file mode 100644
index 00000000..51e18a65
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_security_group_rule.py
@@ -0,0 +1,1177 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import copy
+import mock
+from mock import call
+
+from osc_lib import exceptions
+
+from openstackclient.network import utils as network_utils
+from openstackclient.network.v2 import security_group_rule
+from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
+from openstackclient.tests.unit import fakes
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestSecurityGroupRuleNetwork, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+ # Get a shortcut to the ProjectManager Mock
+ self.projects_mock = self.app.client_manager.identity.projects
+ # Get a shortcut to the DomainManager Mock
+ self.domains_mock = self.app.client_manager.identity.domains
+
+
+class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2):
+
+ def setUp(self):
+ super(TestSecurityGroupRuleCompute, self).setUp()
+
+ # Get a shortcut to the network client
+ self.compute = self.app.client_manager.compute
+
+
+class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
+
+ project = identity_fakes.FakeProject.create_one_project()
+ domain = identity_fakes.FakeDomain.create_one_domain()
+ # The security group rule to be created.
+ _security_group_rule = None
+
+ # The security group that will contain the rule created.
+ _security_group = \
+ network_fakes.FakeSecurityGroup.create_one_security_group()
+
+ expected_columns = (
+ 'direction',
+ 'ethertype',
+ 'id',
+ 'port_range_max',
+ 'port_range_min',
+ 'project_id',
+ 'protocol',
+ 'remote_group_id',
+ 'remote_ip_prefix',
+ 'security_group_id',
+ )
+
+ expected_data = None
+
+ def _setup_security_group_rule(self, attrs=None):
+ self._security_group_rule = \
+ network_fakes.FakeSecurityGroupRule.create_one_security_group_rule(
+ attrs)
+ self.network.create_security_group_rule = mock.Mock(
+ return_value=self._security_group_rule)
+ self.expected_data = (
+ self._security_group_rule.direction,
+ self._security_group_rule.ethertype,
+ self._security_group_rule.id,
+ self._security_group_rule.port_range_max,
+ self._security_group_rule.port_range_min,
+ self._security_group_rule.project_id,
+ self._security_group_rule.protocol,
+ self._security_group_rule.remote_group_id,
+ self._security_group_rule.remote_ip_prefix,
+ self._security_group_rule.security_group_id,
+ )
+
+ def setUp(self):
+ super(TestCreateSecurityGroupRuleNetwork, self).setUp()
+
+ self.network.find_security_group = mock.Mock(
+ return_value=self._security_group)
+
+ self.projects_mock.get.return_value = self.project
+ self.domains_mock.get.return_value = self.domain
+
+ # Get the command object to test
+ self.cmd = security_group_rule.CreateSecurityGroupRule(
+ self.app, self.namespace)
+
+ def test_create_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_create_all_source_options(self):
+ arglist = [
+ '--src-ip', '10.10.0.0/24',
+ '--src-group', self._security_group.id,
+ self._security_group.id,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_create_bad_ethertype(self):
+ arglist = [
+ '--ethertype', 'foo',
+ self._security_group.id,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_create_all_protocol_options(self):
+ arglist = [
+ '--protocol', 'tcp',
+ '--proto', 'tcp',
+ self._security_group.id,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_create_all_port_range_options(self):
+ arglist = [
+ '--dst-port', '80:80',
+ '--icmp-type', '3',
+ '--icmp-code', '1',
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', (80, 80)),
+ ('icmp_type', 3),
+ ('icmp_code', 1),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ def test_create_default_rule(self):
+ self._setup_security_group_rule({
+ 'port_range_max': 443,
+ 'port_range_min': 443,
+ })
+ arglist = [
+ '--dst-port', str(self._security_group_rule.port_range_min),
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', (self._security_group_rule.port_range_min,
+ self._security_group_rule.port_range_max)),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'port_range_max': self._security_group_rule.port_range_max,
+ 'port_range_min': self._security_group_rule.port_range_min,
+ 'protocol': self._security_group_rule.protocol,
+ 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(self.expected_columns, columns)
+ self.assertEqual(self.expected_data, data)
+
+ def test_create_proto_option(self):
+ self._setup_security_group_rule({
+ 'protocol': 'icmp',
+ 'remote_ip_prefix': '10.0.2.0/24',
+ })
+ arglist = [
+ '--proto', self._security_group_rule.protocol,
+ '--src-ip', self._security_group_rule.remote_ip_prefix,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('proto', self._security_group_rule.protocol),
+ ('protocol', None),
+ ('src_ip', self._security_group_rule.remote_ip_prefix),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'protocol': self._security_group_rule.protocol,
+ 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(self.expected_columns, columns)
+ self.assertEqual(self.expected_data, data)
+
+ def test_create_source_group(self):
+ self._setup_security_group_rule({
+ 'port_range_max': 22,
+ 'port_range_min': 22,
+ 'remote_group_id': self._security_group.id,
+ })
+ arglist = [
+ '--dst-port', str(self._security_group_rule.port_range_min),
+ '--ingress',
+ '--src-group', self._security_group.name,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', (self._security_group_rule.port_range_min,
+ self._security_group_rule.port_range_max)),
+ ('ingress', True),
+ ('src_group', self._security_group.name),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'port_range_max': self._security_group_rule.port_range_max,
+ 'port_range_min': self._security_group_rule.port_range_min,
+ 'protocol': self._security_group_rule.protocol,
+ 'remote_group_id': self._security_group_rule.remote_group_id,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(self.expected_columns, columns)
+ self.assertEqual(self.expected_data, data)
+
+ def test_create_source_ip(self):
+ self._setup_security_group_rule({
+ 'protocol': 'icmp',
+ 'remote_ip_prefix': '10.0.2.0/24',
+ })
+ arglist = [
+ '--protocol', self._security_group_rule.protocol,
+ '--src-ip', self._security_group_rule.remote_ip_prefix,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('protocol', self._security_group_rule.protocol),
+ ('src_ip', self._security_group_rule.remote_ip_prefix),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'protocol': self._security_group_rule.protocol,
+ 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(self.expected_columns, columns)
+ self.assertEqual(self.expected_data, data)
+
+ def test_create_network_options(self):
+ self._setup_security_group_rule({
+ 'direction': 'egress',
+ 'ethertype': 'IPv6',
+ 'port_range_max': 443,
+ 'port_range_min': 443,
+ 'protocol': '6',
+ 'remote_group_id': None,
+ 'remote_ip_prefix': None,
+ })
+ arglist = [
+ '--dst-port', str(self._security_group_rule.port_range_min),
+ '--egress',
+ '--ethertype', self._security_group_rule.ethertype,
+ '--project', self.project.name,
+ '--project-domain', self.domain.name,
+ '--protocol', self._security_group_rule.protocol,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', (self._security_group_rule.port_range_min,
+ self._security_group_rule.port_range_max)),
+ ('egress', True),
+ ('ethertype', self._security_group_rule.ethertype),
+ ('project', self.project.name),
+ ('project_domain', self.domain.name),
+ ('protocol', self._security_group_rule.protocol),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'port_range_max': self._security_group_rule.port_range_max,
+ 'port_range_min': self._security_group_rule.port_range_min,
+ 'protocol': self._security_group_rule.protocol,
+ 'security_group_id': self._security_group.id,
+ 'tenant_id': self.project.id,
+ })
+ self.assertEqual(self.expected_columns, columns)
+ self.assertEqual(self.expected_data, data)
+
+ def test_create_tcp_with_icmp_type(self):
+ arglist = [
+ '--protocol', 'tcp',
+ '--icmp-type', '15',
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('protocol', 'tcp'),
+ ('icmp_type', 15),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ def test_create_icmp_code(self):
+ arglist = [
+ '--protocol', '1',
+ '--icmp-code', '1',
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('protocol', '1'),
+ ('icmp_code', 1),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError, self.cmd.take_action,
+ parsed_args)
+
+ def test_create_icmp_type(self):
+ self._setup_security_group_rule({
+ 'port_range_min': 15,
+ 'protocol': 'icmp',
+ 'remote_ip_prefix': '0.0.0.0/0',
+ })
+ arglist = [
+ '--icmp-type', str(self._security_group_rule.port_range_min),
+ '--protocol', self._security_group_rule.protocol,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', None),
+ ('icmp_type', self._security_group_rule.port_range_min),
+ ('icmp_code', None),
+ ('protocol', self._security_group_rule.protocol),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'port_range_min': self._security_group_rule.port_range_min,
+ 'protocol': self._security_group_rule.protocol,
+ 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(self.expected_columns, columns)
+ self.assertEqual(self.expected_data, data)
+
+ def test_create_ipv6_icmp_type_code(self):
+ self._setup_security_group_rule({
+ 'ethertype': 'IPv6',
+ 'port_range_min': 139,
+ 'port_range_max': 2,
+ 'protocol': 'ipv6-icmp',
+ })
+ arglist = [
+ '--icmp-type', str(self._security_group_rule.port_range_min),
+ '--icmp-code', str(self._security_group_rule.port_range_max),
+ '--protocol', self._security_group_rule.protocol,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', None),
+ ('icmp_type', self._security_group_rule.port_range_min),
+ ('icmp_code', self._security_group_rule.port_range_max),
+ ('protocol', self._security_group_rule.protocol),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'port_range_min': self._security_group_rule.port_range_min,
+ 'port_range_max': self._security_group_rule.port_range_max,
+ 'protocol': self._security_group_rule.protocol,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(self.expected_columns, columns)
+ self.assertEqual(self.expected_data, data)
+
+ def test_create_icmpv6_type(self):
+ self._setup_security_group_rule({
+ 'ethertype': 'IPv6',
+ 'port_range_min': 139,
+ 'protocol': 'icmpv6',
+ })
+ arglist = [
+ '--icmp-type', str(self._security_group_rule.port_range_min),
+ '--protocol', self._security_group_rule.protocol,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', None),
+ ('icmp_type', self._security_group_rule.port_range_min),
+ ('icmp_code', None),
+ ('protocol', self._security_group_rule.protocol),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'port_range_min': self._security_group_rule.port_range_min,
+ 'protocol': self._security_group_rule.protocol,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(self.expected_columns, columns)
+ self.assertEqual(self.expected_data, data)
+
+
+class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
+
+ project = identity_fakes.FakeProject.create_one_project()
+ domain = identity_fakes.FakeDomain.create_one_domain()
+ # The security group rule to be created.
+ _security_group_rule = None
+
+ # The security group that will contain the rule created.
+ _security_group = \
+ compute_fakes.FakeSecurityGroup.create_one_security_group()
+
+ def _setup_security_group_rule(self, attrs=None):
+ self._security_group_rule = \
+ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule(
+ attrs)
+ self.compute.security_group_rules.create.return_value = \
+ self._security_group_rule
+ expected_columns, expected_data = \
+ security_group_rule._format_security_group_rule_show(
+ self._security_group_rule._info)
+ return expected_columns, expected_data
+
+ def setUp(self):
+ super(TestCreateSecurityGroupRuleCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.security_groups.get.return_value = self._security_group
+
+ # Get the command object to test
+ self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None)
+
+ def test_create_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_create_all_source_options(self):
+ arglist = [
+ '--src-ip', '10.10.0.0/24',
+ '--src-group', self._security_group.id,
+ self._security_group.id,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_create_bad_protocol(self):
+ arglist = [
+ '--protocol', 'foo',
+ self._security_group.id,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_create_all_protocol_options(self):
+ arglist = [
+ '--protocol', 'tcp',
+ '--proto', 'tcp',
+ self._security_group.id,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_create_network_options(self):
+ arglist = [
+ '--ingress',
+ '--ethertype', 'IPv4',
+ '--icmp-type', '3',
+ '--icmp-code', '11',
+ '--project', self.project.name,
+ '--project-domain', self.domain.name,
+ self._security_group.id,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_create_default_rule(self):
+ expected_columns, expected_data = self._setup_security_group_rule()
+ dst_port = str(self._security_group_rule.from_port) + ':' + \
+ str(self._security_group_rule.to_port)
+ arglist = [
+ '--dst-port', dst_port,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', (self._security_group_rule.from_port,
+ self._security_group_rule.to_port)),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group.id,
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ None,
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+ def test_create_source_group(self):
+ expected_columns, expected_data = self._setup_security_group_rule({
+ 'from_port': 22,
+ 'to_port': 22,
+ 'group': {'name': self._security_group.name},
+ })
+ arglist = [
+ '--dst-port', str(self._security_group_rule.from_port),
+ '--src-group', self._security_group.name,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', (self._security_group_rule.from_port,
+ self._security_group_rule.to_port)),
+ ('src_group', self._security_group.name),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group.id,
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ self._security_group.id,
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+ def test_create_source_ip(self):
+ expected_columns, expected_data = self._setup_security_group_rule({
+ 'ip_protocol': 'icmp',
+ 'from_port': -1,
+ 'to_port': -1,
+ 'ip_range': {'cidr': '10.0.2.0/24'},
+ })
+ arglist = [
+ '--protocol', self._security_group_rule.ip_protocol,
+ '--src-ip', self._security_group_rule.ip_range['cidr'],
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('protocol', self._security_group_rule.ip_protocol),
+ ('src_ip', self._security_group_rule.ip_range['cidr']),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group.id,
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ None,
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+ def test_create_proto_option(self):
+ expected_columns, expected_data = self._setup_security_group_rule({
+ 'ip_protocol': 'icmp',
+ 'from_port': -1,
+ 'to_port': -1,
+ 'ip_range': {'cidr': '10.0.2.0/24'},
+ })
+ arglist = [
+ '--proto', self._security_group_rule.ip_protocol,
+ '--src-ip', self._security_group_rule.ip_range['cidr'],
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('proto', self._security_group_rule.ip_protocol),
+ ('protocol', None),
+ ('src_ip', self._security_group_rule.ip_range['cidr']),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group.id,
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ None,
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+
+class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
+
+ # The security group rules to be deleted.
+ _security_group_rules = \
+ network_fakes.FakeSecurityGroupRule.create_security_group_rules(
+ count=2)
+
+ def setUp(self):
+ super(TestDeleteSecurityGroupRuleNetwork, self).setUp()
+
+ self.network.delete_security_group_rule = mock.Mock(return_value=None)
+
+ self.network.find_security_group_rule = (
+ network_fakes.FakeSecurityGroupRule.get_security_group_rules(
+ self._security_group_rules)
+ )
+
+ # Get the command object to test
+ self.cmd = security_group_rule.DeleteSecurityGroupRule(
+ self.app, self.namespace)
+
+ def test_security_group_rule_delete(self):
+ arglist = [
+ self._security_group_rules[0].id,
+ ]
+ verifylist = [
+ ('rule', [self._security_group_rules[0].id]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.delete_security_group_rule.assert_called_once_with(
+ self._security_group_rules[0])
+ self.assertIsNone(result)
+
+ def test_multi_security_group_rules_delete(self):
+ arglist = []
+ verifylist = []
+
+ for s in self._security_group_rules:
+ arglist.append(s.id)
+ verifylist = [
+ ('rule', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for s in self._security_group_rules:
+ calls.append(call(s))
+ self.network.delete_security_group_rule.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_security_group_rules_delete_with_exception(self):
+ arglist = [
+ self._security_group_rules[0].id,
+ 'unexist_rule',
+ ]
+ verifylist = [
+ ('rule',
+ [self._security_group_rules[0].id, 'unexist_rule']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [
+ self._security_group_rules[0], exceptions.CommandError]
+ self.network.find_security_group_rule = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 rules failed to delete.', str(e))
+
+ self.network.find_security_group_rule.assert_any_call(
+ self._security_group_rules[0].id, ignore_missing=False)
+ self.network.find_security_group_rule.assert_any_call(
+ 'unexist_rule', ignore_missing=False)
+ self.network.delete_security_group_rule.assert_called_once_with(
+ self._security_group_rules[0]
+ )
+
+
+class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
+
+ # The security group rule to be deleted.
+ _security_group_rules = \
+ compute_fakes.FakeSecurityGroupRule.create_security_group_rules(
+ count=2)
+
+ def setUp(self):
+ super(TestDeleteSecurityGroupRuleCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ # Get the command object to test
+ self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None)
+
+ def test_security_group_rule_delete(self):
+ arglist = [
+ self._security_group_rules[0].id,
+ ]
+ verifylist = [
+ ('rule', [self._security_group_rules[0].id]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.compute.security_group_rules.delete.assert_called_once_with(
+ self._security_group_rules[0].id)
+ self.assertIsNone(result)
+
+ def test_multi_security_group_rules_delete(self):
+ arglist = []
+ verifylist = []
+
+ for s in self._security_group_rules:
+ arglist.append(s.id)
+ verifylist = [
+ ('rule', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for s in self._security_group_rules:
+ calls.append(call(s.id))
+ self.compute.security_group_rules.delete.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_security_group_rules_delete_with_exception(self):
+ arglist = [
+ self._security_group_rules[0].id,
+ 'unexist_rule',
+ ]
+ verifylist = [
+ ('rule',
+ [self._security_group_rules[0].id, 'unexist_rule']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [None, exceptions.CommandError]
+ self.compute.security_group_rules.delete = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 rules failed to delete.', str(e))
+
+ self.compute.security_group_rules.delete.assert_any_call(
+ self._security_group_rules[0].id)
+ self.compute.security_group_rules.delete.assert_any_call(
+ 'unexist_rule')
+
+
+class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
+
+ # The security group to hold the rules.
+ _security_group = \
+ network_fakes.FakeSecurityGroup.create_one_security_group()
+
+ # The security group rule to be listed.
+ _security_group_rule_tcp = \
+ network_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
+ 'protocol': 'tcp',
+ 'port_range_max': 80,
+ 'port_range_min': 80,
+ 'security_group_id': _security_group.id,
+ })
+ _security_group_rule_icmp = \
+ network_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
+ 'protocol': 'icmp',
+ 'remote_ip_prefix': '10.0.2.0/24',
+ 'security_group_id': _security_group.id,
+ })
+ _security_group.security_group_rules = [_security_group_rule_tcp._info,
+ _security_group_rule_icmp._info]
+ _security_group_rules = [_security_group_rule_tcp,
+ _security_group_rule_icmp]
+
+ expected_columns_with_group_and_long = (
+ 'ID',
+ 'IP Protocol',
+ 'IP Range',
+ 'Port Range',
+ 'Direction',
+ 'Ethertype',
+ 'Remote Security Group',
+ )
+ expected_columns_no_group = (
+ 'ID',
+ 'IP Protocol',
+ 'IP Range',
+ 'Port Range',
+ 'Remote Security Group',
+ 'Security Group',
+ )
+
+ expected_data_with_group_and_long = []
+ expected_data_no_group = []
+ for _security_group_rule in _security_group_rules:
+ expected_data_with_group_and_long.append((
+ _security_group_rule.id,
+ _security_group_rule.protocol,
+ _security_group_rule.remote_ip_prefix,
+ security_group_rule._format_network_port_range(
+ _security_group_rule),
+ _security_group_rule.direction,
+ _security_group_rule.ethertype,
+ _security_group_rule.remote_group_id,
+ ))
+ expected_data_no_group.append((
+ _security_group_rule.id,
+ _security_group_rule.protocol,
+ _security_group_rule.remote_ip_prefix,
+ security_group_rule._format_network_port_range(
+ _security_group_rule),
+ _security_group_rule.remote_group_id,
+ _security_group_rule.security_group_id,
+ ))
+
+ def setUp(self):
+ super(TestListSecurityGroupRuleNetwork, self).setUp()
+
+ self.network.find_security_group = mock.Mock(
+ return_value=self._security_group)
+ self.network.security_group_rules = mock.Mock(
+ return_value=self._security_group_rules)
+
+ # Get the command object to test
+ self.cmd = security_group_rule.ListSecurityGroupRule(
+ self.app, self.namespace)
+
+ def test_list_default(self):
+ self._security_group_rule_tcp.port_range_min = 80
+ parsed_args = self.check_parser(self.cmd, [], [])
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.security_group_rules.assert_called_once_with(**{})
+ self.assertEqual(self.expected_columns_no_group, columns)
+ self.assertEqual(self.expected_data_no_group, list(data))
+
+ def test_list_with_group_and_long(self):
+ self._security_group_rule_tcp.port_range_min = 80
+ arglist = [
+ '--long',
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('long', True),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.security_group_rules.assert_called_once_with(**{
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(self.expected_columns_with_group_and_long, columns)
+ self.assertEqual(self.expected_data_with_group_and_long, list(data))
+
+ def test_list_with_ignored_options(self):
+ self._security_group_rule_tcp.port_range_min = 80
+ arglist = [
+ '--all-projects',
+ ]
+ verifylist = [
+ ('all_projects', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.security_group_rules.assert_called_once_with(**{})
+ self.assertEqual(self.expected_columns_no_group, columns)
+ self.assertEqual(self.expected_data_no_group, list(data))
+
+
+class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
+
+ # The security group to hold the rules.
+ _security_group = \
+ compute_fakes.FakeSecurityGroup.create_one_security_group()
+
+ # The security group rule to be listed.
+ _security_group_rule_tcp = \
+ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
+ 'ip_protocol': 'tcp',
+ 'from_port': 80,
+ 'to_port': 80,
+ 'group': {'name': _security_group.name},
+ })
+ _security_group_rule_icmp = \
+ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
+ 'ip_protocol': 'icmp',
+ 'from_port': -1,
+ 'to_port': -1,
+ 'ip_range': {'cidr': '10.0.2.0/24'},
+ 'group': {'name': _security_group.name},
+ })
+ _security_group.rules = [_security_group_rule_tcp._info,
+ _security_group_rule_icmp._info]
+
+ expected_columns_with_group = (
+ 'ID',
+ 'IP Protocol',
+ 'IP Range',
+ 'Port Range',
+ 'Remote Security Group',
+ )
+ expected_columns_no_group = \
+ expected_columns_with_group + ('Security Group',)
+
+ expected_data_with_group = []
+ expected_data_no_group = []
+ for _security_group_rule in _security_group.rules:
+ rule = network_utils.transform_compute_security_group_rule(
+ _security_group_rule
+ )
+ expected_rule_with_group = (
+ rule['id'],
+ rule['ip_protocol'],
+ rule['ip_range'],
+ rule['port_range'],
+ rule['remote_security_group'],
+ )
+ expected_rule_no_group = expected_rule_with_group + \
+ (_security_group_rule['parent_group_id'],)
+ expected_data_with_group.append(expected_rule_with_group)
+ expected_data_no_group.append(expected_rule_no_group)
+
+ def setUp(self):
+ super(TestListSecurityGroupRuleCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.security_groups.get.return_value = \
+ self._security_group
+ self.compute.security_groups.list.return_value = \
+ [self._security_group]
+
+ # Get the command object to test
+ self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None)
+
+ def test_list_default(self):
+ parsed_args = self.check_parser(self.cmd, [], [])
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.compute.security_groups.list.assert_called_once_with(
+ search_opts={'all_tenants': False}
+ )
+ self.assertEqual(self.expected_columns_no_group, columns)
+ self.assertEqual(self.expected_data_no_group, list(data))
+
+ def test_list_with_group(self):
+ arglist = [
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.compute.security_groups.get.assert_called_once_with(
+ self._security_group.id
+ )
+ self.assertEqual(self.expected_columns_with_group, columns)
+ self.assertEqual(self.expected_data_with_group, list(data))
+
+ def test_list_all_projects(self):
+ arglist = [
+ '--all-projects',
+ ]
+ verifylist = [
+ ('all_projects', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.compute.security_groups.list.assert_called_once_with(
+ search_opts={'all_tenants': True}
+ )
+ self.assertEqual(self.expected_columns_no_group, columns)
+ self.assertEqual(self.expected_data_no_group, list(data))
+
+ def test_list_with_ignored_options(self):
+ arglist = [
+ '--long',
+ ]
+ verifylist = [
+ ('long', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.compute.security_groups.list.assert_called_once_with(
+ search_opts={'all_tenants': False}
+ )
+ self.assertEqual(self.expected_columns_no_group, columns)
+ self.assertEqual(self.expected_data_no_group, list(data))
+
+
+class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
+
+ # The security group rule to be shown.
+ _security_group_rule = \
+ network_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
+
+ columns = (
+ 'direction',
+ 'ethertype',
+ 'id',
+ 'port_range_max',
+ 'port_range_min',
+ 'project_id',
+ 'protocol',
+ 'remote_group_id',
+ 'remote_ip_prefix',
+ 'security_group_id',
+ )
+
+ data = (
+ _security_group_rule.direction,
+ _security_group_rule.ethertype,
+ _security_group_rule.id,
+ _security_group_rule.port_range_max,
+ _security_group_rule.port_range_min,
+ _security_group_rule.project_id,
+ _security_group_rule.protocol,
+ _security_group_rule.remote_group_id,
+ _security_group_rule.remote_ip_prefix,
+ _security_group_rule.security_group_id,
+ )
+
+ def setUp(self):
+ super(TestShowSecurityGroupRuleNetwork, self).setUp()
+
+ self.network.find_security_group_rule = mock.Mock(
+ return_value=self._security_group_rule)
+
+ # Get the command object to test
+ self.cmd = security_group_rule.ShowSecurityGroupRule(
+ self.app, self.namespace)
+
+ def test_show_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_show_all_options(self):
+ arglist = [
+ self._security_group_rule.id,
+ ]
+ verifylist = [
+ ('rule', self._security_group_rule.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_security_group_rule.assert_called_once_with(
+ self._security_group_rule.id, ignore_missing=False)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
+
+ # The security group rule to be shown.
+ _security_group_rule = \
+ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
+
+ columns, data = \
+ security_group_rule._format_security_group_rule_show(
+ _security_group_rule._info)
+
+ def setUp(self):
+ super(TestShowSecurityGroupRuleCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ # Build a security group fake customized for this test.
+ security_group_rules = [self._security_group_rule._info]
+ security_group = fakes.FakeResource(
+ info=copy.deepcopy({'rules': security_group_rules}),
+ loaded=True)
+ security_group.rules = security_group_rules
+ self.compute.security_groups.list.return_value = [security_group]
+
+ # Get the command object to test
+ self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None)
+
+ def test_show_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_show_all_options(self):
+ arglist = [
+ self._security_group_rule.id,
+ ]
+ verifylist = [
+ ('rule', self._security_group_rule.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_groups.list.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_subnet.py b/openstackclient/tests/unit/network/v2/test_subnet.py
new file mode 100644
index 00000000..e31db469
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_subnet.py
@@ -0,0 +1,978 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+from mock import call
+
+from osc_lib import exceptions
+from osc_lib import utils
+
+from openstackclient.network.v2 import subnet as subnet_v2
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestSubnet(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestSubnet, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+ # Get a shortcut to the ProjectManager Mock
+ self.projects_mock = self.app.client_manager.identity.projects
+ # Get a shortcut to the DomainManager Mock
+ self.domains_mock = self.app.client_manager.identity.domains
+
+
+class TestCreateSubnet(TestSubnet):
+
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ domain = identity_fakes_v3.FakeDomain.create_one_domain()
+ # An IPv4 subnet to be created with mostly default values
+ _subnet = network_fakes.FakeSubnet.create_one_subnet(
+ attrs={
+ 'tenant_id': project.id,
+ }
+ )
+
+ # Subnet pool to be used to create a subnet from a pool
+ _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool()
+
+ # An IPv4 subnet to be created using a specific subnet pool
+ _subnet_from_pool = network_fakes.FakeSubnet.create_one_subnet(
+ attrs={
+ 'tenant_id': project.id,
+ 'subnetpool_id': _subnet_pool.id,
+ 'dns_nameservers': ['8.8.8.8',
+ '8.8.4.4'],
+ 'host_routes': [{'destination': '10.20.20.0/24',
+ 'nexthop': '10.20.20.1'},
+ {'destination': '10.30.30.0/24',
+ 'nexthop': '10.30.30.1'}],
+ 'service_types': ['network:router_gateway',
+ 'network:floatingip_agent_gateway'],
+ }
+ )
+
+ # An IPv6 subnet to be created with most options specified
+ _subnet_ipv6 = network_fakes.FakeSubnet.create_one_subnet(
+ attrs={
+ 'tenant_id': project.id,
+ 'cidr': 'fe80:0:0:a00a::/64',
+ 'enable_dhcp': True,
+ 'dns_nameservers': ['fe80:27ff:a00a:f00f::ffff',
+ 'fe80:37ff:a00a:f00f::ffff'],
+ 'allocation_pools': [{'start': 'fe80::a00a:0:c0de:0:100',
+ 'end': 'fe80::a00a:0:c0de:0:f000'},
+ {'start': 'fe80::a00a:0:c0de:1:100',
+ 'end': 'fe80::a00a:0:c0de:1:f000'}],
+ 'host_routes': [{'destination': 'fe80:27ff:a00a:f00f::/64',
+ 'nexthop': 'fe80:27ff:a00a:f00f::1'},
+ {'destination': 'fe80:37ff:a00a:f00f::/64',
+ 'nexthop': 'fe80:37ff:a00a:f00f::1'}],
+ 'ip_version': 6,
+ 'gateway_ip': 'fe80::a00a:0:c0de:0:1',
+ 'ipv6_address_mode': 'slaac',
+ 'ipv6_ra_mode': 'slaac',
+ 'subnetpool_id': 'None',
+ 'service_types': ['network:router_gateway',
+ 'network:floatingip_agent_gateway'],
+ }
+ )
+
+ # The network to be returned from find_network
+ _network = network_fakes.FakeNetwork.create_one_network(
+ attrs={
+ 'id': _subnet.network_id,
+ }
+ )
+
+ # The network segment to be returned from find_segment
+ _network_segment = \
+ network_fakes.FakeNetworkSegment.create_one_network_segment(
+ attrs={
+ 'network_id': _subnet.network_id,
+ }
+ )
+
+ columns = (
+ 'allocation_pools',
+ 'cidr',
+ 'dns_nameservers',
+ 'enable_dhcp',
+ 'gateway_ip',
+ 'host_routes',
+ 'id',
+ 'ip_version',
+ 'ipv6_address_mode',
+ 'ipv6_ra_mode',
+ 'name',
+ 'network_id',
+ 'project_id',
+ 'segment_id',
+ 'service_types',
+ 'subnetpool_id',
+ )
+
+ data = (
+ subnet_v2._format_allocation_pools(_subnet.allocation_pools),
+ _subnet.cidr,
+ utils.format_list(_subnet.dns_nameservers),
+ _subnet.enable_dhcp,
+ _subnet.gateway_ip,
+ subnet_v2._format_host_routes(_subnet.host_routes),
+ _subnet.id,
+ _subnet.ip_version,
+ _subnet.ipv6_address_mode,
+ _subnet.ipv6_ra_mode,
+ _subnet.name,
+ _subnet.network_id,
+ _subnet.project_id,
+ _subnet.segment_id,
+ utils.format_list(_subnet.service_types),
+ _subnet.subnetpool_id,
+ )
+
+ data_subnet_pool = (
+ subnet_v2._format_allocation_pools(_subnet_from_pool.allocation_pools),
+ _subnet_from_pool.cidr,
+ utils.format_list(_subnet_from_pool.dns_nameservers),
+ _subnet_from_pool.enable_dhcp,
+ _subnet_from_pool.gateway_ip,
+ subnet_v2._format_host_routes(_subnet_from_pool.host_routes),
+ _subnet_from_pool.id,
+ _subnet_from_pool.ip_version,
+ _subnet_from_pool.ipv6_address_mode,
+ _subnet_from_pool.ipv6_ra_mode,
+ _subnet_from_pool.name,
+ _subnet_from_pool.network_id,
+ _subnet_from_pool.project_id,
+ _subnet_from_pool.segment_id,
+ utils.format_list(_subnet_from_pool.service_types),
+ _subnet_from_pool.subnetpool_id,
+ )
+
+ data_ipv6 = (
+ subnet_v2._format_allocation_pools(_subnet_ipv6.allocation_pools),
+ _subnet_ipv6.cidr,
+ utils.format_list(_subnet_ipv6.dns_nameservers),
+ _subnet_ipv6.enable_dhcp,
+ _subnet_ipv6.gateway_ip,
+ subnet_v2._format_host_routes(_subnet_ipv6.host_routes),
+ _subnet_ipv6.id,
+ _subnet_ipv6.ip_version,
+ _subnet_ipv6.ipv6_address_mode,
+ _subnet_ipv6.ipv6_ra_mode,
+ _subnet_ipv6.name,
+ _subnet_ipv6.network_id,
+ _subnet_ipv6.project_id,
+ _subnet_ipv6.segment_id,
+ utils.format_list(_subnet_ipv6.service_types),
+ _subnet_ipv6.subnetpool_id,
+ )
+
+ def setUp(self):
+ super(TestCreateSubnet, self).setUp()
+
+ # Get the command object to test
+ self.cmd = subnet_v2.CreateSubnet(self.app, self.namespace)
+
+ self.projects_mock.get.return_value = self.project
+ self.domains_mock.get.return_value = self.domain
+
+ # Mock SDK calls for all tests.
+ self.network.find_network = mock.Mock(return_value=self._network)
+ self.network.find_segment = mock.Mock(
+ return_value=self._network_segment
+ )
+ self.network.find_subnet_pool = mock.Mock(
+ return_value=self._subnet_pool
+ )
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Testing that a call without the required argument will fail and
+ # throw a "ParserExecption"
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ # Mock SDK calls for this test.
+ self.network.create_subnet = mock.Mock(return_value=self._subnet)
+ self._network.id = self._subnet.network_id
+
+ arglist = [
+ "--subnet-range", self._subnet.cidr,
+ "--network", self._subnet.network_id,
+ self._subnet.name,
+ ]
+ verifylist = [
+ ('name', self._subnet.name),
+ ('subnet_range', self._subnet.cidr),
+ ('network', self._subnet.network_id),
+ ('ip_version', self._subnet.ip_version),
+ ('gateway', 'auto'),
+
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_subnet.assert_called_once_with(**{
+ 'cidr': self._subnet.cidr,
+ 'ip_version': self._subnet.ip_version,
+ 'name': self._subnet.name,
+ 'network_id': self._subnet.network_id,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_from_subnet_pool_options(self):
+ # Mock SDK calls for this test.
+ self.network.create_subnet = \
+ mock.Mock(return_value=self._subnet_from_pool)
+ self._network.id = self._subnet_from_pool.network_id
+
+ arglist = [
+ self._subnet_from_pool.name,
+ "--subnet-pool", self._subnet_from_pool.subnetpool_id,
+ "--prefix-length", '24',
+ "--network", self._subnet_from_pool.network_id,
+ "--ip-version", str(self._subnet_from_pool.ip_version),
+ "--gateway", self._subnet_from_pool.gateway_ip,
+ "--dhcp",
+ ]
+
+ for dns_addr in self._subnet_from_pool.dns_nameservers:
+ arglist.append('--dns-nameserver')
+ arglist.append(dns_addr)
+
+ for host_route in self._subnet_from_pool.host_routes:
+ arglist.append('--host-route')
+ value = 'gateway=' + host_route.get('nexthop', '') + \
+ ',destination=' + host_route.get('destination', '')
+ arglist.append(value)
+
+ for service_type in self._subnet_from_pool.service_types:
+ arglist.append('--service-type')
+ arglist.append(service_type)
+
+ verifylist = [
+ ('name', self._subnet_from_pool.name),
+ ('prefix_length', '24'),
+ ('network', self._subnet_from_pool.network_id),
+ ('ip_version', self._subnet_from_pool.ip_version),
+ ('gateway', self._subnet_from_pool.gateway_ip),
+ ('dns_nameservers', self._subnet_from_pool.dns_nameservers),
+ ('dhcp', self._subnet_from_pool.enable_dhcp),
+ ('host_routes', subnet_v2.convert_entries_to_gateway(
+ self._subnet_from_pool.host_routes)),
+ ('subnet_pool', self._subnet_from_pool.subnetpool_id),
+ ('service_types', self._subnet_from_pool.service_types),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_subnet.assert_called_once_with(**{
+ 'dns_nameservers': self._subnet_from_pool.dns_nameservers,
+ 'enable_dhcp': self._subnet_from_pool.enable_dhcp,
+ 'gateway_ip': self._subnet_from_pool.gateway_ip,
+ 'host_routes': self._subnet_from_pool.host_routes,
+ 'ip_version': self._subnet_from_pool.ip_version,
+ 'name': self._subnet_from_pool.name,
+ 'network_id': self._subnet_from_pool.network_id,
+ 'prefixlen': '24',
+ 'subnetpool_id': self._subnet_from_pool.subnetpool_id,
+ 'service_types': self._subnet_from_pool.service_types,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data_subnet_pool, data)
+
+ def test_create_options_subnet_range_ipv6(self):
+ # Mock SDK calls for this test.
+ self.network.create_subnet = mock.Mock(return_value=self._subnet_ipv6)
+ self._network.id = self._subnet_ipv6.network_id
+
+ arglist = [
+ self._subnet_ipv6.name,
+ "--subnet-range", self._subnet_ipv6.cidr,
+ "--network", self._subnet_ipv6.network_id,
+ "--ip-version", str(self._subnet_ipv6.ip_version),
+ "--ipv6-ra-mode", self._subnet_ipv6.ipv6_ra_mode,
+ "--ipv6-address-mode", self._subnet_ipv6.ipv6_address_mode,
+ "--gateway", self._subnet_ipv6.gateway_ip,
+ "--dhcp",
+ ]
+
+ for dns_addr in self._subnet_ipv6.dns_nameservers:
+ arglist.append('--dns-nameserver')
+ arglist.append(dns_addr)
+
+ for host_route in self._subnet_ipv6.host_routes:
+ arglist.append('--host-route')
+ value = 'gateway=' + host_route.get('nexthop', '') + \
+ ',destination=' + host_route.get('destination', '')
+ arglist.append(value)
+
+ for pool in self._subnet_ipv6.allocation_pools:
+ arglist.append('--allocation-pool')
+ value = 'start=' + pool.get('start', '') + \
+ ',end=' + pool.get('end', '')
+ arglist.append(value)
+
+ for service_type in self._subnet_ipv6.service_types:
+ arglist.append('--service-type')
+ arglist.append(service_type)
+
+ verifylist = [
+ ('name', self._subnet_ipv6.name),
+ ('subnet_range', self._subnet_ipv6.cidr),
+ ('network', self._subnet_ipv6.network_id),
+ ('ip_version', self._subnet_ipv6.ip_version),
+ ('ipv6_ra_mode', self._subnet_ipv6.ipv6_ra_mode),
+ ('ipv6_address_mode', self._subnet_ipv6.ipv6_address_mode),
+ ('gateway', self._subnet_ipv6.gateway_ip),
+ ('dns_nameservers', self._subnet_ipv6.dns_nameservers),
+ ('dhcp', self._subnet_ipv6.enable_dhcp),
+ ('host_routes', subnet_v2.convert_entries_to_gateway(
+ self._subnet_ipv6.host_routes)),
+ ('allocation_pools', self._subnet_ipv6.allocation_pools),
+ ('service_types', self._subnet_ipv6.service_types),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_subnet.assert_called_once_with(**{
+ 'cidr': self._subnet_ipv6.cidr,
+ 'dns_nameservers': self._subnet_ipv6.dns_nameservers,
+ 'enable_dhcp': self._subnet_ipv6.enable_dhcp,
+ 'gateway_ip': self._subnet_ipv6.gateway_ip,
+ 'host_routes': self._subnet_ipv6.host_routes,
+ 'ip_version': self._subnet_ipv6.ip_version,
+ 'ipv6_address_mode': self._subnet_ipv6.ipv6_address_mode,
+ 'ipv6_ra_mode': self._subnet_ipv6.ipv6_ra_mode,
+ 'name': self._subnet_ipv6.name,
+ 'network_id': self._subnet_ipv6.network_id,
+ 'allocation_pools': self._subnet_ipv6.allocation_pools,
+ 'service_types': self._subnet_ipv6.service_types,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data_ipv6, data)
+
+ def test_create_no_beta_command_options(self):
+ arglist = [
+ "--subnet-range", self._subnet.cidr,
+ "--network-segment", self._network_segment.id,
+ "--network", self._subnet.network_id,
+ self._subnet.name,
+ ]
+ verifylist = [
+ ('name', self._subnet.name),
+ ('subnet_range', self._subnet.cidr),
+ ('network-segment', self._network_segment.id),
+ ('network', self._subnet.network_id),
+ ]
+ self.app.options.os_beta_command = False
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, verifylist)
+
+ def test_create_with_network_segment(self):
+ # Mock SDK calls for this test.
+ self.network.create_subnet = mock.Mock(return_value=self._subnet)
+ self._network.id = self._subnet.network_id
+
+ arglist = [
+ "--subnet-range", self._subnet.cidr,
+ "--network-segment", self._network_segment.id,
+ "--network", self._subnet.network_id,
+ self._subnet.name,
+ ]
+ verifylist = [
+ ('name', self._subnet.name),
+ ('subnet_range', self._subnet.cidr),
+ ('network_segment', self._network_segment.id),
+ ('network', self._subnet.network_id),
+ ('ip_version', self._subnet.ip_version),
+ ('gateway', 'auto'),
+
+ ]
+
+ self.app.options.os_beta_command = True
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_subnet.assert_called_once_with(**{
+ 'cidr': self._subnet.cidr,
+ 'ip_version': self._subnet.ip_version,
+ 'name': self._subnet.name,
+ 'network_id': self._subnet.network_id,
+ 'segment_id': self._network_segment.id,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteSubnet(TestSubnet):
+
+ # The subnets to delete.
+ _subnets = network_fakes.FakeSubnet.create_subnets(count=2)
+
+ def setUp(self):
+ super(TestDeleteSubnet, self).setUp()
+
+ self.network.delete_subnet = mock.Mock(return_value=None)
+
+ self.network.find_subnet = (
+ network_fakes.FakeSubnet.get_subnets(self._subnets))
+
+ # Get the command object to test
+ self.cmd = subnet_v2.DeleteSubnet(self.app, self.namespace)
+
+ def test_subnet_delete(self):
+ arglist = [
+ self._subnets[0].name,
+ ]
+ verifylist = [
+ ('subnet', [self._subnets[0].name]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.delete_subnet.assert_called_once_with(self._subnets[0])
+ self.assertIsNone(result)
+
+ def test_multi_subnets_delete(self):
+ arglist = []
+ verifylist = []
+
+ for s in self._subnets:
+ arglist.append(s.name)
+ verifylist = [
+ ('subnet', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for s in self._subnets:
+ calls.append(call(s))
+ self.network.delete_subnet.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_subnets_delete_with_exception(self):
+ arglist = [
+ self._subnets[0].name,
+ 'unexist_subnet',
+ ]
+ verifylist = [
+ ('subnet',
+ [self._subnets[0].name, 'unexist_subnet']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self._subnets[0], exceptions.CommandError]
+ self.network.find_subnet = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 subnets failed to delete.', str(e))
+
+ self.network.find_subnet.assert_any_call(
+ self._subnets[0].name, ignore_missing=False)
+ self.network.find_subnet.assert_any_call(
+ 'unexist_subnet', ignore_missing=False)
+ self.network.delete_subnet.assert_called_once_with(
+ self._subnets[0]
+ )
+
+
+class TestListSubnet(TestSubnet):
+ # The subnets going to be listed up.
+ _subnet = network_fakes.FakeSubnet.create_subnets(count=3)
+
+ columns = (
+ 'ID',
+ 'Name',
+ 'Network',
+ 'Subnet',
+ )
+ columns_long = columns + (
+ 'Project',
+ 'DHCP',
+ 'Name Servers',
+ 'Allocation Pools',
+ 'Host Routes',
+ 'IP Version',
+ 'Gateway',
+ 'Service Types',
+ )
+
+ data = []
+ for subnet in _subnet:
+ data.append((
+ subnet.id,
+ subnet.name,
+ subnet.network_id,
+ subnet.cidr,
+ ))
+
+ data_long = []
+ for subnet in _subnet:
+ data_long.append((
+ subnet.id,
+ subnet.name,
+ subnet.network_id,
+ subnet.cidr,
+ subnet.tenant_id,
+ subnet.enable_dhcp,
+ utils.format_list(subnet.dns_nameservers),
+ subnet_v2._format_allocation_pools(subnet.allocation_pools),
+ utils.format_list(subnet.host_routes),
+ subnet.ip_version,
+ subnet.gateway_ip,
+ utils.format_list(subnet.service_types),
+ ))
+
+ def setUp(self):
+ super(TestListSubnet, self).setUp()
+
+ # Get the command object to test
+ self.cmd = subnet_v2.ListSubnet(self.app, self.namespace)
+
+ self.network.subnets = mock.Mock(return_value=self._subnet)
+
+ def test_subnet_list_no_options(self):
+ arglist = []
+ verifylist = [
+ ('long', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.subnets.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_subnet_list_long(self):
+ arglist = [
+ '--long',
+ ]
+ verifylist = [
+ ('long', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.subnets.assert_called_once_with()
+ self.assertEqual(self.columns_long, columns)
+ self.assertEqual(self.data_long, list(data))
+
+ def test_subnet_list_ip_version(self):
+ arglist = [
+ '--ip-version', str(4),
+ ]
+ verifylist = [
+ ('ip_version', 4),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ filters = {'ip_version': 4}
+
+ self.network.subnets.assert_called_once_with(**filters)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_subnet_list_dhcp(self):
+ arglist = [
+ '--dhcp',
+ ]
+ verifylist = [
+ ('dhcp', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ filters = {'enable_dhcp': True}
+
+ self.network.subnets.assert_called_once_with(**filters)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_subnet_list_no_dhcp(self):
+ arglist = [
+ '--no-dhcp',
+ ]
+ verifylist = [
+ ('no_dhcp', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ filters = {'enable_dhcp': False}
+
+ self.network.subnets.assert_called_once_with(**filters)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_subnet_list_service_type(self):
+ arglist = [
+ '--service-type', 'network:router_gateway',
+ ]
+ verifylist = [
+ ('service_types', ['network:router_gateway']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ filters = {'service_types': ['network:router_gateway']}
+
+ self.network.subnets.assert_called_once_with(**filters)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_subnet_list_service_type_multiple(self):
+ arglist = [
+ '--service-type', 'network:router_gateway',
+ '--service-type', 'network:floatingip_agent_gateway',
+ ]
+ verifylist = [
+ ('service_types', ['network:router_gateway',
+ 'network:floatingip_agent_gateway']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ filters = {'service_types': ['network:router_gateway',
+ 'network:floatingip_agent_gateway']}
+
+ self.network.subnets.assert_called_once_with(**filters)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestSetSubnet(TestSubnet):
+
+ _subnet = network_fakes.FakeSubnet.create_one_subnet()
+
+ def setUp(self):
+ super(TestSetSubnet, self).setUp()
+ self.network.update_subnet = mock.Mock(return_value=None)
+ self.network.find_subnet = mock.Mock(return_value=self._subnet)
+ self.cmd = subnet_v2.SetSubnet(self.app, self.namespace)
+
+ def test_set_this(self):
+ arglist = [
+ "--name", "new_subnet",
+ "--dhcp",
+ "--gateway", self._subnet.gateway_ip,
+ self._subnet.name,
+ ]
+ verifylist = [
+ ('name', "new_subnet"),
+ ('dhcp', True),
+ ('gateway', self._subnet.gateway_ip),
+ ('subnet', self._subnet.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ attrs = {
+ 'enable_dhcp': True,
+ 'gateway_ip': self._subnet.gateway_ip,
+ 'name': "new_subnet",
+ }
+ self.network.update_subnet.assert_called_with(self._subnet, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_that(self):
+ arglist = [
+ "--name", "new_subnet",
+ "--no-dhcp",
+ "--gateway", "none",
+ self._subnet.name,
+ ]
+ verifylist = [
+ ('name', "new_subnet"),
+ ('no_dhcp', True),
+ ('gateway', "none"),
+ ('subnet', self._subnet.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ attrs = {
+ 'enable_dhcp': False,
+ 'gateway_ip': None,
+ 'name': "new_subnet",
+ }
+ self.network.update_subnet.assert_called_with(self._subnet, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_nothing(self):
+ arglist = [self._subnet.name, ]
+ verifylist = [('subnet', self._subnet.name)]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {}
+ self.network.update_subnet.assert_called_with(self._subnet, **attrs)
+ self.assertIsNone(result)
+
+ def test_append_options(self):
+ _testsubnet = network_fakes.FakeSubnet.create_one_subnet(
+ {'dns_nameservers': ["10.0.0.1"],
+ 'service_types': ["network:router_gateway"]})
+ self.network.find_subnet = mock.Mock(return_value=_testsubnet)
+ arglist = [
+ '--dns-nameserver', '10.0.0.2',
+ '--service-type', 'network:floatingip_agent_gateway',
+ _testsubnet.name,
+ ]
+ verifylist = [
+ ('dns_nameservers', ['10.0.0.2']),
+ ('service_types', ['network:floatingip_agent_gateway']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ attrs = {
+ 'dns_nameservers': ['10.0.0.2', '10.0.0.1'],
+ 'service_types': ['network:floatingip_agent_gateway',
+ 'network:router_gateway'],
+ }
+ self.network.update_subnet.assert_called_once_with(
+ _testsubnet, **attrs)
+ self.assertIsNone(result)
+
+
+class TestShowSubnet(TestSubnet):
+ # The subnets to be shown
+ _subnet = network_fakes.FakeSubnet.create_one_subnet()
+
+ columns = (
+ 'allocation_pools',
+ 'cidr',
+ 'dns_nameservers',
+ 'enable_dhcp',
+ 'gateway_ip',
+ 'host_routes',
+ 'id',
+ 'ip_version',
+ 'ipv6_address_mode',
+ 'ipv6_ra_mode',
+ 'name',
+ 'network_id',
+ 'project_id',
+ 'segment_id',
+ 'service_types',
+ 'subnetpool_id',
+ )
+
+ data = (
+ subnet_v2._format_allocation_pools(_subnet.allocation_pools),
+ _subnet.cidr,
+ utils.format_list(_subnet.dns_nameservers),
+ _subnet.enable_dhcp,
+ _subnet.gateway_ip,
+ utils.format_list(_subnet.host_routes),
+ _subnet.id,
+ _subnet.ip_version,
+ _subnet.ipv6_address_mode,
+ _subnet.ipv6_ra_mode,
+ _subnet.name,
+ _subnet.network_id,
+ _subnet.tenant_id,
+ _subnet.segment_id,
+ utils.format_list(_subnet.service_types),
+ _subnet.subnetpool_id,
+ )
+
+ def setUp(self):
+ super(TestShowSubnet, self).setUp()
+
+ # Get the command object to test
+ self.cmd = subnet_v2.ShowSubnet(self.app, self.namespace)
+
+ self.network.find_subnet = mock.Mock(return_value=self._subnet)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Testing that a call without the required argument will fail and
+ # throw a "ParserExecption"
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self._subnet.name,
+ ]
+ verifylist = [
+ ('subnet', self._subnet.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_subnet.assert_called_once_with(
+ self._subnet.name, ignore_missing=False)
+
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestUnsetSubnet(TestSubnet):
+
+ def setUp(self):
+ super(TestUnsetSubnet, self).setUp()
+ self._testsubnet = network_fakes.FakeSubnet.create_one_subnet(
+ {'dns_nameservers': ['8.8.8.8',
+ '8.8.8.4'],
+ 'host_routes': [{'destination': '10.20.20.0/24',
+ 'nexthop': '10.20.20.1'},
+ {'destination': '10.30.30.30/24',
+ 'nexthop': '10.30.30.1'}],
+ 'allocation_pools': [{'start': '8.8.8.100',
+ 'end': '8.8.8.150'},
+ {'start': '8.8.8.160',
+ 'end': '8.8.8.170'}],
+ 'service_types': ['network:router_gateway',
+ 'network:floatingip_agent_gateway'], })
+ self.network.find_subnet = mock.Mock(return_value=self._testsubnet)
+ self.network.update_subnet = mock.Mock(return_value=None)
+ # Get the command object to test
+ self.cmd = subnet_v2.UnsetSubnet(self.app, self.namespace)
+
+ def test_unset_subnet_params(self):
+ arglist = [
+ '--dns-nameserver', '8.8.8.8',
+ '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1',
+ '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150',
+ '--service-type', 'network:router_gateway',
+ self._testsubnet.name,
+ ]
+ verifylist = [
+ ('dns_nameservers', ['8.8.8.8']),
+ ('host_routes', [{
+ "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]),
+ ('allocation_pools', [{
+ 'start': '8.8.8.100', 'end': '8.8.8.150'}]),
+ ('service_types', ['network:router_gateway']),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'dns_nameservers': ['8.8.8.4'],
+ 'host_routes': [{
+ "destination": "10.20.20.0/24", "nexthop": "10.20.20.1"}],
+ 'allocation_pools': [{'start': '8.8.8.160', 'end': '8.8.8.170'}],
+ 'service_types': ['network:floatingip_agent_gateway'],
+ }
+ self.network.update_subnet.assert_called_once_with(
+ self._testsubnet, **attrs)
+ self.assertIsNone(result)
+
+ def test_unset_subnet_wrong_host_routes(self):
+ arglist = [
+ '--dns-nameserver', '8.8.8.8',
+ '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.2',
+ '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150',
+ self._testsubnet.name,
+ ]
+ verifylist = [
+ ('dns_nameservers', ['8.8.8.8']),
+ ('host_routes', [{
+ "destination": "10.30.30.30/24", "gateway": "10.30.30.2"}]),
+ ('allocation_pools', [{
+ 'start': '8.8.8.100', 'end': '8.8.8.150'}]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError,
+ self.cmd.take_action, parsed_args)
+
+ def test_unset_subnet_wrong_allocation_pool(self):
+ arglist = [
+ '--dns-nameserver', '8.8.8.8',
+ '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1',
+ '--allocation-pool', 'start=8.8.8.100,end=8.8.8.156',
+ self._testsubnet.name,
+ ]
+ verifylist = [
+ ('dns_nameservers', ['8.8.8.8']),
+ ('host_routes', [{
+ "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]),
+ ('allocation_pools', [{
+ 'start': '8.8.8.100', 'end': '8.8.8.156'}]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError,
+ self.cmd.take_action, parsed_args)
+
+ def test_unset_subnet_wrong_dns_nameservers(self):
+ arglist = [
+ '--dns-nameserver', '8.8.8.1',
+ '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1',
+ '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150',
+ self._testsubnet.name,
+ ]
+ verifylist = [
+ ('dns_nameservers', ['8.8.8.1']),
+ ('host_routes', [{
+ "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]),
+ ('allocation_pools', [{
+ 'start': '8.8.8.100', 'end': '8.8.8.150'}]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError,
+ self.cmd.take_action, parsed_args)
+
+ def test_unset_subnet_wrong_service_type(self):
+ arglist = [
+ '--dns-nameserver', '8.8.8.8',
+ '--host-route', 'destination=10.30.30.30/24,gateway=10.30.30.1',
+ '--allocation-pool', 'start=8.8.8.100,end=8.8.8.150',
+ '--service-type', 'network:dhcp',
+ self._testsubnet.name,
+ ]
+ verifylist = [
+ ('dns_nameservers', ['8.8.8.8']),
+ ('host_routes', [{
+ "destination": "10.30.30.30/24", "gateway": "10.30.30.1"}]),
+ ('allocation_pools', [{
+ 'start': '8.8.8.100', 'end': '8.8.8.150'}]),
+ ('service_types', ['network:dhcp']),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError,
+ self.cmd.take_action, parsed_args)
diff --git a/openstackclient/tests/unit/network/v2/test_subnet_pool.py b/openstackclient/tests/unit/network/v2/test_subnet_pool.py
new file mode 100644
index 00000000..0d9494c0
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_subnet_pool.py
@@ -0,0 +1,722 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import argparse
+import mock
+from mock import call
+
+from osc_lib import exceptions
+from osc_lib import utils
+
+from openstackclient.network.v2 import subnet_pool
+from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+class TestSubnetPool(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestSubnetPool, self).setUp()
+
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+ # Get a shortcut to the ProjectManager Mock
+ self.projects_mock = self.app.client_manager.identity.projects
+ # Get a shortcut to the DomainManager Mock
+ self.domains_mock = self.app.client_manager.identity.domains
+
+
+class TestCreateSubnetPool(TestSubnetPool):
+
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ domain = identity_fakes_v3.FakeDomain.create_one_domain()
+ # The new subnet pool to create.
+ _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool()
+
+ _address_scope = network_fakes.FakeAddressScope.create_one_address_scope()
+
+ columns = (
+ 'address_scope_id',
+ 'default_prefixlen',
+ 'default_quota',
+ 'id',
+ 'ip_version',
+ 'is_default',
+ 'max_prefixlen',
+ 'min_prefixlen',
+ 'name',
+ 'prefixes',
+ 'project_id',
+ 'shared',
+ )
+ data = (
+ _subnet_pool.address_scope_id,
+ _subnet_pool.default_prefixlen,
+ _subnet_pool.default_quota,
+ _subnet_pool.id,
+ _subnet_pool.ip_version,
+ _subnet_pool.is_default,
+ _subnet_pool.max_prefixlen,
+ _subnet_pool.min_prefixlen,
+ _subnet_pool.name,
+ utils.format_list(_subnet_pool.prefixes),
+ _subnet_pool.project_id,
+ _subnet_pool.shared,
+ )
+
+ def setUp(self):
+ super(TestCreateSubnetPool, self).setUp()
+
+ self.network.create_subnet_pool = mock.Mock(
+ return_value=self._subnet_pool)
+
+ # Get the command object to test
+ self.cmd = subnet_pool.CreateSubnetPool(self.app, self.namespace)
+
+ self.network.find_address_scope = mock.Mock(
+ return_value=self._address_scope)
+
+ self.projects_mock.get.return_value = self.project
+ self.domains_mock.get.return_value = self.domain
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_no_pool_prefix(self):
+ """Make sure --pool-prefix is a required argument"""
+ arglist = [
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('name', self._subnet_pool.name),
+ ]
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ '--pool-prefix', '10.0.10.0/24',
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('prefixes', ['10.0.10.0/24']),
+ ('name', self._subnet_pool.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_subnet_pool.assert_called_once_with(**{
+ 'prefixes': ['10.0.10.0/24'],
+ 'name': self._subnet_pool.name,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_prefixlen_options(self):
+ arglist = [
+ '--default-prefix-length', self._subnet_pool.default_prefixlen,
+ '--max-prefix-length', self._subnet_pool.max_prefixlen,
+ '--min-prefix-length', self._subnet_pool.min_prefixlen,
+ '--pool-prefix', '10.0.10.0/24',
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('default_prefix_length',
+ int(self._subnet_pool.default_prefixlen)),
+ ('max_prefix_length', int(self._subnet_pool.max_prefixlen)),
+ ('min_prefix_length', int(self._subnet_pool.min_prefixlen)),
+ ('name', self._subnet_pool.name),
+ ('prefixes', ['10.0.10.0/24']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_subnet_pool.assert_called_once_with(**{
+ 'default_prefixlen': int(self._subnet_pool.default_prefixlen),
+ 'max_prefixlen': int(self._subnet_pool.max_prefixlen),
+ 'min_prefixlen': int(self._subnet_pool.min_prefixlen),
+ 'prefixes': ['10.0.10.0/24'],
+ 'name': self._subnet_pool.name,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_len_negative(self):
+ arglist = [
+ self._subnet_pool.name,
+ '--min-prefix-length', '-16',
+ ]
+ verifylist = [
+ ('subnet_pool', self._subnet_pool.name),
+ ('min_prefix_length', '-16'),
+ ]
+
+ self.assertRaises(argparse.ArgumentTypeError, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_project_domain(self):
+ arglist = [
+ '--pool-prefix', '10.0.10.0/24',
+ "--project", self.project.name,
+ "--project-domain", self.domain.name,
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('prefixes', ['10.0.10.0/24']),
+ ('project', self.project.name),
+ ('project_domain', self.domain.name),
+ ('name', self._subnet_pool.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_subnet_pool.assert_called_once_with(**{
+ 'prefixes': ['10.0.10.0/24'],
+ 'tenant_id': self.project.id,
+ 'name': self._subnet_pool.name,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_address_scope_option(self):
+ arglist = [
+ '--pool-prefix', '10.0.10.0/24',
+ '--address-scope', self._address_scope.id,
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('prefixes', ['10.0.10.0/24']),
+ ('address_scope', self._address_scope.id),
+ ('name', self._subnet_pool.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_subnet_pool.assert_called_once_with(**{
+ 'prefixes': ['10.0.10.0/24'],
+ 'address_scope_id': self._address_scope.id,
+ 'name': self._subnet_pool.name,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_default_and_shared_options(self):
+ arglist = [
+ '--pool-prefix', '10.0.10.0/24',
+ '--default',
+ '--share',
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('prefixes', ['10.0.10.0/24']),
+ ('default', True),
+ ('share', True),
+ ('name', self._subnet_pool.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_subnet_pool.assert_called_once_with(**{
+ 'is_default': True,
+ 'name': self._subnet_pool.name,
+ 'prefixes': ['10.0.10.0/24'],
+ 'shared': True,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestDeleteSubnetPool(TestSubnetPool):
+
+ # The subnet pools to delete.
+ _subnet_pools = network_fakes.FakeSubnetPool.create_subnet_pools(count=2)
+
+ def setUp(self):
+ super(TestDeleteSubnetPool, self).setUp()
+
+ self.network.delete_subnet_pool = mock.Mock(return_value=None)
+
+ self.network.find_subnet_pool = (
+ network_fakes.FakeSubnetPool.get_subnet_pools(self._subnet_pools)
+ )
+
+ # Get the command object to test
+ self.cmd = subnet_pool.DeleteSubnetPool(self.app, self.namespace)
+
+ def test_subnet_pool_delete(self):
+ arglist = [
+ self._subnet_pools[0].name,
+ ]
+ verifylist = [
+ ('subnet_pool', [self._subnet_pools[0].name]),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.delete_subnet_pool.assert_called_once_with(
+ self._subnet_pools[0])
+ self.assertIsNone(result)
+
+ def test_multi_subnet_pools_delete(self):
+ arglist = []
+ verifylist = []
+
+ for s in self._subnet_pools:
+ arglist.append(s.name)
+ verifylist = [
+ ('subnet_pool', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for s in self._subnet_pools:
+ calls.append(call(s))
+ self.network.delete_subnet_pool.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_subnet_pools_delete_with_exception(self):
+ arglist = [
+ self._subnet_pools[0].name,
+ 'unexist_subnet_pool',
+ ]
+ verifylist = [
+ ('subnet_pool',
+ [self._subnet_pools[0].name, 'unexist_subnet_pool']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self._subnet_pools[0], exceptions.CommandError]
+ self.network.find_subnet_pool = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 subnet pools failed to delete.', str(e))
+
+ self.network.find_subnet_pool.assert_any_call(
+ self._subnet_pools[0].name, ignore_missing=False)
+ self.network.find_subnet_pool.assert_any_call(
+ 'unexist_subnet_pool', ignore_missing=False)
+ self.network.delete_subnet_pool.assert_called_once_with(
+ self._subnet_pools[0]
+ )
+
+
+class TestListSubnetPool(TestSubnetPool):
+ # The subnet pools going to be listed up.
+ _subnet_pools = network_fakes.FakeSubnetPool.create_subnet_pools(count=3)
+
+ columns = (
+ 'ID',
+ 'Name',
+ 'Prefixes',
+ )
+ columns_long = columns + (
+ 'Default Prefix Length',
+ 'Address Scope',
+ 'Default Subnet Pool',
+ 'Shared',
+ )
+
+ data = []
+ for pool in _subnet_pools:
+ data.append((
+ pool.id,
+ pool.name,
+ utils.format_list(pool.prefixes),
+ ))
+
+ data_long = []
+ for pool in _subnet_pools:
+ data_long.append((
+ pool.id,
+ pool.name,
+ utils.format_list(pool.prefixes),
+ pool.default_prefixlen,
+ pool.address_scope_id,
+ pool.is_default,
+ pool.shared,
+ ))
+
+ def setUp(self):
+ super(TestListSubnetPool, self).setUp()
+
+ # Get the command object to test
+ self.cmd = subnet_pool.ListSubnetPool(self.app, self.namespace)
+
+ self.network.subnet_pools = mock.Mock(return_value=self._subnet_pools)
+
+ def test_subnet_pool_list_no_option(self):
+ arglist = []
+ verifylist = [
+ ('long', False),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.subnet_pools.assert_called_once_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_subnet_pool_list_long(self):
+ arglist = [
+ '--long',
+ ]
+ verifylist = [
+ ('long', True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.subnet_pools.assert_called_once_with()
+ self.assertEqual(self.columns_long, columns)
+ self.assertEqual(self.data_long, list(data))
+
+
+class TestSetSubnetPool(TestSubnetPool):
+
+ # The subnet_pool to set.
+ _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool()
+
+ _address_scope = network_fakes.FakeAddressScope.create_one_address_scope()
+
+ def setUp(self):
+ super(TestSetSubnetPool, self).setUp()
+
+ self.network.update_subnet_pool = mock.Mock(return_value=None)
+
+ self.network.find_subnet_pool = mock.Mock(
+ return_value=self._subnet_pool)
+
+ self.network.find_address_scope = mock.Mock(
+ return_value=self._address_scope)
+
+ # Get the command object to test
+ self.cmd = subnet_pool.SetSubnetPool(self.app, self.namespace)
+
+ def test_set_this(self):
+ arglist = [
+ '--name', 'noob',
+ '--default-prefix-length', '8',
+ '--min-prefix-length', '8',
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('name', 'noob'),
+ ('default_prefix_length', 8),
+ ('min_prefix_length', 8),
+ ('subnet_pool', self._subnet_pool.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'name': 'noob',
+ 'default_prefixlen': 8,
+ 'min_prefixlen': 8,
+ }
+ self.network.update_subnet_pool.assert_called_once_with(
+ self._subnet_pool, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_that(self):
+ arglist = [
+ '--pool-prefix', '10.0.1.0/24',
+ '--pool-prefix', '10.0.2.0/24',
+ '--max-prefix-length', '16',
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('prefixes', ['10.0.1.0/24', '10.0.2.0/24']),
+ ('max_prefix_length', 16),
+ ('subnet_pool', self._subnet_pool.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ prefixes = ['10.0.1.0/24', '10.0.2.0/24']
+ prefixes.extend(self._subnet_pool.prefixes)
+ attrs = {
+ 'prefixes': prefixes,
+ 'max_prefixlen': 16,
+ }
+ self.network.update_subnet_pool.assert_called_once_with(
+ self._subnet_pool, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_nothing(self):
+ arglist = [self._subnet_pool.name, ]
+ verifylist = [('subnet_pool', self._subnet_pool.name), ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {}
+ self.network.update_subnet_pool.assert_called_once_with(
+ self._subnet_pool, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_len_negative(self):
+ arglist = [
+ '--max-prefix-length', '-16',
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('max_prefix_length', '-16'),
+ ('subnet_pool', self._subnet_pool.name),
+ ]
+
+ self.assertRaises(argparse.ArgumentTypeError, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_set_address_scope(self):
+ arglist = [
+ '--address-scope', self._address_scope.id,
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('address_scope', self._address_scope.id),
+ ('subnet_pool', self._subnet_pool.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'address_scope_id': self._address_scope.id,
+ }
+ self.network.update_subnet_pool.assert_called_once_with(
+ self._subnet_pool, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_no_address_scope(self):
+ arglist = [
+ '--no-address-scope',
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('no_address_scope', True),
+ ('subnet_pool', self._subnet_pool.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'address_scope_id': None,
+ }
+ self.network.update_subnet_pool.assert_called_once_with(
+ self._subnet_pool, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_no_address_scope_conflict(self):
+ arglist = [
+ '--address-scope', self._address_scope.id,
+ '--no-address-scope',
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('address_scope', self._address_scope.id),
+ ('no_address_scope', True),
+ ('subnet_pool', self._subnet_pool.name),
+ ]
+
+ # Exclusive arguments will conflict here.
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_set_default(self):
+ arglist = [
+ '--default',
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('default', True),
+ ('subnet_pool', self._subnet_pool.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'is_default': True
+ }
+ self.network.update_subnet_pool.assert_called_once_with(
+ self._subnet_pool, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_no_default(self):
+ arglist = [
+ '--no-default',
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('no_default', True),
+ ('subnet_pool', self._subnet_pool.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'is_default': False,
+ }
+ self.network.update_subnet_pool.assert_called_once_with(
+ self._subnet_pool, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_no_default_conflict(self):
+ arglist = [
+ '--default',
+ '--no-default',
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('default', True),
+ ('no_default', True),
+ ('subnet_pool', self._subnet_pool.name),
+ ]
+
+ # Exclusive arguments will conflict here.
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+
+class TestShowSubnetPool(TestSubnetPool):
+
+ # The subnet_pool to set.
+ _subnet_pool = network_fakes.FakeSubnetPool.create_one_subnet_pool()
+
+ columns = (
+ 'address_scope_id',
+ 'default_prefixlen',
+ 'default_quota',
+ 'id',
+ 'ip_version',
+ 'is_default',
+ 'max_prefixlen',
+ 'min_prefixlen',
+ 'name',
+ 'prefixes',
+ 'project_id',
+ 'shared',
+ )
+
+ data = (
+ _subnet_pool.address_scope_id,
+ _subnet_pool.default_prefixlen,
+ _subnet_pool.default_quota,
+ _subnet_pool.id,
+ _subnet_pool.ip_version,
+ _subnet_pool.is_default,
+ _subnet_pool.max_prefixlen,
+ _subnet_pool.min_prefixlen,
+ _subnet_pool.name,
+ utils.format_list(_subnet_pool.prefixes),
+ _subnet_pool.tenant_id,
+ _subnet_pool.shared,
+ )
+
+ def setUp(self):
+ super(TestShowSubnetPool, self).setUp()
+
+ self.network.find_subnet_pool = mock.Mock(
+ return_value=self._subnet_pool
+ )
+
+ # Get the command object to test
+ self.cmd = subnet_pool.ShowSubnetPool(self.app, self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self._subnet_pool.name,
+ ]
+ verifylist = [
+ ('subnet_pool', self._subnet_pool.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_subnet_pool.assert_called_once_with(
+ self._subnet_pool.name,
+ ignore_missing=False
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+
+class TestUnsetSubnetPool(TestSubnetPool):
+
+ def setUp(self):
+ super(TestUnsetSubnetPool, self).setUp()
+ self._subnetpool = network_fakes.FakeSubnetPool.create_one_subnet_pool(
+ {'prefixes': ['10.0.10.0/24', '10.1.10.0/24',
+ '10.2.10.0/24'], })
+ self.network.find_subnet_pool = mock.Mock(
+ return_value=self._subnetpool)
+ self.network.update_subnet_pool = mock.Mock(return_value=None)
+ # Get the command object to test
+ self.cmd = subnet_pool.UnsetSubnetPool(self.app, self.namespace)
+
+ def test_unset_subnet_pool(self):
+ arglist = [
+ '--pool-prefix', '10.0.10.0/24',
+ '--pool-prefix', '10.1.10.0/24',
+ self._subnetpool.name,
+ ]
+ verifylist = [('prefixes', ['10.0.10.0/24', '10.1.10.0/24'])]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ attrs = {'prefixes': ['10.2.10.0/24']}
+ self.network.update_subnet_pool.assert_called_once_with(
+ self._subnetpool, **attrs)
+ self.assertIsNone(result)
+
+ def test_unset_subnet_pool_prefix_not_existent(self):
+ arglist = [
+ '--pool-prefix', '10.100.1.1/25',
+ self._subnetpool.name,
+ ]
+ verifylist = [('prefixes', ['10.100.1.1/25'])]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError,
+ self.cmd.take_action,
+ parsed_args)