summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/api/compute_v2.py211
-rw-r--r--openstackclient/compute/client.py22
-rw-r--r--openstackclient/compute/v2/server.py16
-rw-r--r--openstackclient/network/v2/security_group.py36
-rw-r--r--openstackclient/network/v2/security_group_rule.py42
-rw-r--r--openstackclient/tests/functional/compute/v2/test_server.py4
-rw-r--r--openstackclient/tests/unit/api/test_compute_v2.py228
-rw-r--r--openstackclient/tests/unit/compute/v2/fakes.py15
-rw-r--r--openstackclient/tests/unit/compute/v2/test_server.py45
-rw-r--r--openstackclient/tests/unit/fakes.py3
-rw-r--r--openstackclient/tests/unit/network/v2/test_security_group_compute.py200
-rw-r--r--openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py172
12 files changed, 729 insertions, 265 deletions
diff --git a/openstackclient/api/compute_v2.py b/openstackclient/api/compute_v2.py
new file mode 100644
index 00000000..3bf3a0d8
--- /dev/null
+++ b/openstackclient/api/compute_v2.py
@@ -0,0 +1,211 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""Compute v2 API Library"""
+
+from keystoneauth1 import exceptions as ksa_exceptions
+from osc_lib.api import api
+from osc_lib import exceptions
+from osc_lib.i18n import _
+
+
+class APIv2(api.BaseAPI):
+ """Compute v2 API"""
+
+ def __init__(self, **kwargs):
+ super(APIv2, self).__init__(**kwargs)
+
+ # Overrides
+
+ # TODO(dtroyer): Override find() until these fixes get into an osc-lib
+ # minimum release
+ def find(
+ self,
+ path,
+ value=None,
+ attr=None,
+ ):
+ """Find a single resource by name or ID
+
+ :param string path:
+ The API-specific portion of the URL path
+ :param string value:
+ search expression (required, really)
+ :param string attr:
+ name of attribute for secondary search
+ """
+
+ try:
+ ret = self._request('GET', "/%s/%s" % (path, value)).json()
+ if isinstance(ret, dict):
+ # strip off the enclosing dict
+ key = list(ret.keys())[0]
+ ret = ret[key]
+ except (
+ ksa_exceptions.NotFound,
+ ksa_exceptions.BadRequest,
+ ):
+ kwargs = {attr: value}
+ try:
+ ret = self.find_one(path, **kwargs)
+ except ksa_exceptions.NotFound:
+ msg = _("%s not found") % value
+ raise exceptions.NotFound(msg)
+
+ return ret
+
+ # Security Groups
+
+ def security_group_create(
+ self,
+ name=None,
+ description=None,
+ ):
+ """Create a new security group
+
+ https://developer.openstack.org/api-ref/compute/#create-security-group
+
+ :param string name:
+ Security group name
+ :param integer description:
+ Security group description
+ """
+
+ url = "/os-security-groups"
+
+ params = {
+ 'name': name,
+ 'description': description,
+ }
+
+ return self.create(
+ url,
+ json={'security_group': params},
+ )['security_group']
+
+ def security_group_delete(
+ self,
+ security_group=None,
+ ):
+ """Delete a security group
+
+ https://developer.openstack.org/api-ref/compute/#delete-security-group
+
+ :param string security_group:
+ Security group name or ID
+ """
+
+ url = "/os-security-groups"
+
+ security_group = self.find(
+ url,
+ attr='name',
+ value=security_group,
+ )['id']
+ if security_group is not None:
+ return self.delete('/%s/%s' % (url, security_group))
+
+ return None
+
+ def security_group_find(
+ self,
+ security_group=None,
+ ):
+ """Return a security group given name or ID
+
+ https://developer.openstack.org/api-ref/compute/#show-security-group-details
+
+ :param string security_group:
+ Security group name or ID
+ :returns: A dict of the security group attributes
+ """
+
+ url = "/os-security-groups"
+
+ return self.find(
+ url,
+ attr='name',
+ value=security_group,
+ )
+
+ def security_group_list(
+ self,
+ limit=None,
+ marker=None,
+ search_opts=None,
+ ):
+ """Get security groups
+
+ https://developer.openstack.org/api-ref/compute/#list-security-groups
+
+ :param integer limit:
+ query return count limit
+ :param string marker:
+ query marker
+ :param search_opts:
+ (undocumented) Search filter dict
+ all_tenants: True|False - return all projects
+ :returns:
+ list of security groups names
+ """
+
+ params = {}
+ if search_opts is not None:
+ params = dict((k, v) for (k, v) in search_opts.items() if v)
+ if limit:
+ params['limit'] = limit
+ if marker:
+ params['offset'] = marker
+
+ url = "/os-security-groups"
+ return self.list(url, **params)["security_groups"]
+
+ def security_group_set(
+ self,
+ security_group=None,
+ # name=None,
+ # description=None,
+ **params
+ ):
+ """Update a security group
+
+ https://developer.openstack.org/api-ref/compute/#update-security-group
+
+ :param string security_group:
+ Security group name or ID
+
+ TODO(dtroyer): Create an update method in osc-lib
+ """
+
+ # Short-circuit no-op
+ if params is None:
+ return None
+
+ url = "/os-security-groups"
+
+ security_group = self.find(
+ url,
+ attr='name',
+ value=security_group,
+ )
+ if security_group is not None:
+ for (k, v) in params.items():
+ # Only set a value if it is already present
+ if k in security_group:
+ security_group[k] = v
+ return self._request(
+ "PUT",
+ "/%s/%s" % (url, security_group['id']),
+ json={'security_group': security_group},
+ ).json()['security_group']
+ return None
diff --git a/openstackclient/compute/client.py b/openstackclient/compute/client.py
index b4b463b4..6abfef04 100644
--- a/openstackclient/compute/client.py
+++ b/openstackclient/compute/client.py
@@ -31,6 +31,11 @@ API_VERSIONS = {
"2.1": "novaclient.client",
}
+COMPUTE_API_TYPE = 'compute'
+COMPUTE_API_VERSIONS = {
+ '2': 'openstackclient.api.compute_v2.APIv2',
+}
+
# Save the microversion if in use
_compute_api_version = None
@@ -58,6 +63,13 @@ def make_client(instance):
LOG.debug('Instantiating compute client for %s', version)
+ compute_api = utils.get_client_class(
+ API_NAME,
+ version.ver_major,
+ COMPUTE_API_VERSIONS,
+ )
+ LOG.debug('Instantiating compute api: %s', compute_api)
+
# Set client http_log_debug to True if verbosity level is high enough
http_log_debug = utils.get_effective_log_level() <= logging.DEBUG
@@ -77,6 +89,16 @@ def make_client(instance):
**kwargs
)
+ client.api = compute_api(
+ session=instance.session,
+ service_type=COMPUTE_API_TYPE,
+ endpoint=instance.get_endpoint_for_service_type(
+ COMPUTE_API_TYPE,
+ region_name=instance.region_name,
+ interface=instance.interface,
+ )
+ )
+
return client
diff --git a/openstackclient/compute/v2/server.py b/openstackclient/compute/v2/server.py
index ae839677..7cd4588b 100644
--- a/openstackclient/compute/v2/server.py
+++ b/openstackclient/compute/v2/server.py
@@ -22,6 +22,7 @@ import logging
import os
import sys
+from novaclient.v2 import servers
from osc_lib.cli import parseractions
from osc_lib.command import command
from osc_lib import exceptions
@@ -29,11 +30,6 @@ from osc_lib import utils
from oslo_utils import timeutils
import six
-try:
- from novaclient.v2 import servers
-except ImportError:
- from novaclient.v1_1 import servers
-
from openstackclient.i18n import _
from openstackclient.identity import common as identity_common
@@ -316,12 +312,11 @@ class AddServerSecurityGroup(command.Command):
compute_client.servers,
parsed_args.server,
)
- security_group = utils.find_resource(
- compute_client.security_groups,
+ security_group = compute_client.api.security_group_find(
parsed_args.group,
)
- server.add_security_group(security_group.id)
+ server.add_security_group(security_group['id'])
class AddServerVolume(command.Command):
@@ -1437,12 +1432,11 @@ class RemoveServerSecurityGroup(command.Command):
compute_client.servers,
parsed_args.server,
)
- security_group = utils.find_resource(
- compute_client.security_groups,
+ security_group = compute_client.api.security_group_find(
parsed_args.group,
)
- server.remove_security_group(security_group.id)
+ server.remove_security_group(security_group['id'])
class RemoveServerVolume(command.Command):
diff --git a/openstackclient/network/v2/security_group.py b/openstackclient/network/v2/security_group.py
index 182d4817..75af3587 100644
--- a/openstackclient/network/v2/security_group.py
+++ b/openstackclient/network/v2/security_group.py
@@ -140,13 +140,13 @@ class CreateSecurityGroup(common.NetworkAndComputeShowOne):
def take_action_compute(self, client, parsed_args):
description = self._get_description(parsed_args)
- obj = client.security_groups.create(
+ obj = client.api.security_group_create(
parsed_args.name,
description,
)
display_columns, property_columns = _get_columns(obj)
data = utils.get_dict_properties(
- obj._info,
+ obj,
property_columns,
formatters=_formatters_compute
)
@@ -174,8 +174,7 @@ class DeleteSecurityGroup(common.NetworkAndComputeDelete):
client.delete_security_group(obj)
def take_action_compute(self, client, parsed_args):
- data = utils.find_resource(client.security_groups, self.r)
- client.security_groups.delete(data.id)
+ client.api.security_group_delete(self.r)
# TODO(rauta): Use the SDK resource mapped attribute names once
@@ -242,7 +241,10 @@ class ListSecurityGroup(common.NetworkAndComputeLister):
def take_action_compute(self, client, parsed_args):
search = {'all_tenants': parsed_args.all_projects}
- data = client.security_groups.list(search_opts=search)
+ data = client.api.security_group_list(
+ # TODO(dtroyer): add limit, marker
+ search_opts=search,
+ )
columns = (
"ID",
@@ -254,7 +256,7 @@ class ListSecurityGroup(common.NetworkAndComputeLister):
columns = columns + ('Tenant ID',)
column_headers = column_headers + ('Project',)
return (column_headers,
- (utils.get_item_properties(
+ (utils.get_dict_properties(
s, columns,
) for s in data))
@@ -294,23 +296,20 @@ class SetSecurityGroup(common.NetworkAndComputeCommand):
client.update_security_group(obj, **attrs)
def take_action_compute(self, client, parsed_args):
- data = utils.find_resource(
- client.security_groups,
- parsed_args.group,
- )
+ data = client.api.security_group_find(parsed_args.group)
if parsed_args.name is not None:
- data.name = parsed_args.name
+ data['name'] = parsed_args.name
if parsed_args.description is not None:
- data.description = parsed_args.description
+ data['description'] = parsed_args.description
# NOTE(rtheis): Previous behavior did not raise a CommandError
# if there were no updates. Maintain this behavior and issue
# the update.
- client.security_groups.update(
+ client.api.security_group_set(
data,
- data.name,
- data.description,
+ data['name'],
+ data['description'],
)
@@ -337,13 +336,10 @@ class ShowSecurityGroup(common.NetworkAndComputeShowOne):
return (display_columns, data)
def take_action_compute(self, client, parsed_args):
- obj = utils.find_resource(
- client.security_groups,
- parsed_args.group,
- )
+ obj = client.api.security_group_find(parsed_args.group)
display_columns, property_columns = _get_columns(obj)
data = utils.get_dict_properties(
- obj._info,
+ obj,
property_columns,
formatters=_formatters_compute
)
diff --git a/openstackclient/network/v2/security_group_rule.py b/openstackclient/network/v2/security_group_rule.py
index 8f07c5a4..33c3ff02 100644
--- a/openstackclient/network/v2/security_group_rule.py
+++ b/openstackclient/network/v2/security_group_rule.py
@@ -16,11 +16,6 @@
import argparse
import logging
-try:
- from novaclient.v2 import security_group_rules as compute_secgroup_rules
-except ImportError:
- from novaclient.v1_1 import security_group_rules as compute_secgroup_rules
-
from osc_lib.cli import parseractions
from osc_lib import exceptions
from osc_lib import utils
@@ -348,10 +343,7 @@ class CreateSecurityGroupRule(common.NetworkAndComputeShowOne):
return (display_columns, data)
def take_action_compute(self, client, parsed_args):
- group = utils.find_resource(
- client.security_groups,
- parsed_args.group,
- )
+ group = client.api.security_group_find(parsed_args.group)
protocol = self._get_protocol(parsed_args)
if protocol == 'icmp':
from_port, to_port = -1, -1
@@ -363,10 +355,9 @@ class CreateSecurityGroupRule(common.NetworkAndComputeShowOne):
remote_ip = None
if not (parsed_args.remote_group is None and
parsed_args.src_group is None):
- parsed_args.remote_group = utils.find_resource(
- client.security_groups,
+ parsed_args.remote_group = client.api.security_group_find(
parsed_args.remote_group or parsed_args.src_group,
- ).id
+ )['id']
if parsed_args.src_group:
LOG.warning(
_("The %(old)s option is deprecated, "
@@ -384,8 +375,9 @@ class CreateSecurityGroupRule(common.NetworkAndComputeShowOne):
)
else:
remote_ip = '0.0.0.0/0'
+
obj = client.security_group_rules.create(
- group.id,
+ group['id'],
protocol,
from_port,
to_port,
@@ -567,27 +559,29 @@ class ListSecurityGroupRule(common.NetworkAndComputeLister):
rules_to_list = []
if parsed_args.group is not None:
- group = utils.find_resource(
- client.security_groups,
+ group = client.api.security_group_find(
parsed_args.group,
)
- rules_to_list = group.rules
+ rules_to_list = group['rules']
else:
columns = columns + ('parent_group_id',)
search = {'all_tenants': parsed_args.all_projects}
- for group in client.security_groups.list(search_opts=search):
- rules_to_list.extend(group.rules)
+ for group in client.api.security_group_list(search_opts=search):
+ rules_to_list.extend(group['rules'])
# NOTE(rtheis): Turn the raw rules into resources.
rules = []
for rule in rules_to_list:
- rules.append(compute_secgroup_rules.SecurityGroupRule(
- client.security_group_rules,
+ rules.append(
network_utils.transform_compute_security_group_rule(rule),
- ))
+ )
+ # rules.append(compute_secgroup_rules.SecurityGroupRule(
+ # client.security_group_rules,
+ # network_utils.transform_compute_security_group_rule(rule),
+ # ))
return (column_headers,
- (utils.get_item_properties(
+ (utils.get_dict_properties(
s, columns,
) for s in rules))
@@ -617,8 +611,8 @@ class ShowSecurityGroupRule(common.NetworkAndComputeShowOne):
# the requested rule.
obj = None
security_group_rules = []
- for security_group in client.security_groups.list():
- security_group_rules.extend(security_group.rules)
+ for security_group in client.api.security_group_list():
+ security_group_rules.extend(security_group['rules'])
for security_group_rule in security_group_rules:
if parsed_args.rule == str(security_group_rule.get('id')):
obj = security_group_rule
diff --git a/openstackclient/tests/functional/compute/v2/test_server.py b/openstackclient/tests/functional/compute/v2/test_server.py
index f152de80..dd257e9a 100644
--- a/openstackclient/tests/functional/compute/v2/test_server.py
+++ b/openstackclient/tests/functional/compute/v2/test_server.py
@@ -371,7 +371,7 @@ class ServerTests(common.ComputeTestCase):
server_name = uuid.uuid4().hex
server = json.loads(self.openstack(
# auto/none enable in nova micro version (v2.37+)
- '--os-compute-api-version 2.latest ' +
+ '--os-compute-api-version 2.37 ' +
'server create -f json ' +
'--flavor ' + self.flavor_name + ' ' +
'--image ' + self.image_name + ' ' +
@@ -394,7 +394,7 @@ class ServerTests(common.ComputeTestCase):
try:
self.openstack(
# auto/none enable in nova micro version (v2.37+)
- '--os-compute-api-version 2.latest ' +
+ '--os-compute-api-version 2.37 ' +
'server create -f json ' +
'--flavor ' + self.flavor_name + ' ' +
'--image ' + self.image_name + ' ' +
diff --git a/openstackclient/tests/unit/api/test_compute_v2.py b/openstackclient/tests/unit/api/test_compute_v2.py
new file mode 100644
index 00000000..f443e810
--- /dev/null
+++ b/openstackclient/tests/unit/api/test_compute_v2.py
@@ -0,0 +1,228 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""Compute v2 API Library Tests"""
+
+from requests_mock.contrib import fixture
+
+from keystoneclient import session
+from openstackclient.api import compute_v2 as compute
+from openstackclient.tests.unit import utils
+from osc_lib import exceptions as osc_lib_exceptions
+
+
+FAKE_PROJECT = 'xyzpdq'
+FAKE_URL = 'http://gopher.com/v2'
+
+
+class TestComputeAPIv2(utils.TestCase):
+
+ def setUp(self):
+ super(TestComputeAPIv2, self).setUp()
+ sess = session.Session()
+ self.api = compute.APIv2(session=sess, endpoint=FAKE_URL)
+ self.requests_mock = self.useFixture(fixture.Fixture())
+
+
+class TestSecurityGroup(TestComputeAPIv2):
+
+ FAKE_SECURITY_GROUP_RESP = {
+ 'id': '1',
+ 'name': 'sg1',
+ 'description': 'test security group',
+ 'tenant_id': '0123456789',
+ 'rules': []
+ }
+ FAKE_SECURITY_GROUP_RESP_2 = {
+ 'id': '2',
+ 'name': 'sg2',
+ 'description': 'another test security group',
+ 'tenant_id': '0123456789',
+ 'rules': []
+ }
+ LIST_SECURITY_GROUP_RESP = [
+ FAKE_SECURITY_GROUP_RESP_2,
+ FAKE_SECURITY_GROUP_RESP,
+ ]
+
+ def test_security_group_create_default(self):
+ self.requests_mock.register_uri(
+ 'POST',
+ FAKE_URL + '/os-security-groups',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ ret = self.api.security_group_create('sg1')
+ self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret)
+
+ def test_security_group_create_options(self):
+ self.requests_mock.register_uri(
+ 'POST',
+ FAKE_URL + '/os-security-groups',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ ret = self.api.security_group_create(
+ name='sg1',
+ description='desc',
+ )
+ self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret)
+
+ def test_security_group_delete_id(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/1',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ self.requests_mock.register_uri(
+ 'DELETE',
+ FAKE_URL + '/os-security-groups/1',
+ status_code=202,
+ )
+ ret = self.api.security_group_delete('1')
+ self.assertEqual(202, ret.status_code)
+ self.assertEqual("", ret.text)
+
+ def test_security_group_delete_name(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/sg1',
+ status_code=404,
+ )
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups',
+ json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ self.requests_mock.register_uri(
+ 'DELETE',
+ FAKE_URL + '/os-security-groups/1',
+ status_code=202,
+ )
+ ret = self.api.security_group_delete('sg1')
+ self.assertEqual(202, ret.status_code)
+ self.assertEqual("", ret.text)
+
+ def test_security_group_delete_not_found(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/sg3',
+ status_code=404,
+ )
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups',
+ json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ self.assertRaises(
+ osc_lib_exceptions.NotFound,
+ self.api.security_group_delete,
+ 'sg3',
+ )
+
+ def test_security_group_find_id(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/1',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ ret = self.api.security_group_find('1')
+ self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret)
+
+ def test_security_group_find_name(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/sg2',
+ status_code=404,
+ )
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups',
+ json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ ret = self.api.security_group_find('sg2')
+ self.assertEqual(self.FAKE_SECURITY_GROUP_RESP_2, ret)
+
+ def test_security_group_find_not_found(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/sg3',
+ status_code=404,
+ )
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups',
+ json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ self.assertRaises(
+ osc_lib_exceptions.NotFound,
+ self.api.security_group_find,
+ 'sg3',
+ )
+
+ def test_security_group_list_no_options(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups',
+ json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ ret = self.api.security_group_list()
+ self.assertEqual(self.LIST_SECURITY_GROUP_RESP, ret)
+
+ def test_security_group_set_options_id(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/1',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ self.requests_mock.register_uri(
+ 'PUT',
+ FAKE_URL + '/os-security-groups/1',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ ret = self.api.security_group_set(
+ security_group='1',
+ description='desc2')
+ self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret)
+
+ def test_security_group_set_options_name(self):
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups/sg2',
+ status_code=404,
+ )
+ self.requests_mock.register_uri(
+ 'GET',
+ FAKE_URL + '/os-security-groups',
+ json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
+ status_code=200,
+ )
+ self.requests_mock.register_uri(
+ 'PUT',
+ FAKE_URL + '/os-security-groups/2',
+ json={'security_group': self.FAKE_SECURITY_GROUP_RESP_2},
+ status_code=200,
+ )
+ ret = self.api.security_group_set(
+ security_group='sg2',
+ description='desc2')
+ self.assertEqual(self.FAKE_SECURITY_GROUP_RESP_2, ret)
diff --git a/openstackclient/tests/unit/compute/v2/fakes.py b/openstackclient/tests/unit/compute/v2/fakes.py
index 4a194859..05cb5076 100644
--- a/openstackclient/tests/unit/compute/v2/fakes.py
+++ b/openstackclient/tests/unit/compute/v2/fakes.py
@@ -17,6 +17,7 @@ import copy
import mock
import uuid
+from openstackclient.api import compute_v2
from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes
from openstackclient.tests.unit.image.v2 import fakes as image_fakes
@@ -180,9 +181,6 @@ class FakeComputev2Client(object):
self.hypervisors_stats = mock.Mock()
self.hypervisors_stats.resource_class = fakes.FakeResource(None, {})
- self.security_groups = mock.Mock()
- self.security_groups.resource_class = fakes.FakeResource(None, {})
-
self.security_group_rules = mock.Mock()
self.security_group_rules.resource_class = fakes.FakeResource(None, {})
@@ -222,6 +220,11 @@ class TestComputev2(utils.TestCommand):
token=fakes.AUTH_TOKEN,
)
+ self.app.client_manager.compute.api = compute_v2.APIv2(
+ session=self.app.client_manager.session,
+ endpoint=fakes.AUTH_URL,
+ )
+
self.app.client_manager.identity = identity_fakes.FakeIdentityv2Client(
endpoint=fakes.AUTH_URL,
token=fakes.AUTH_TOKEN,
@@ -485,11 +488,7 @@ class FakeSecurityGroup(object):
# Overwrite default attributes.
security_group_attrs.update(attrs)
-
- security_group = fakes.FakeResource(
- info=copy.deepcopy(security_group_attrs),
- loaded=True)
- return security_group
+ return security_group_attrs
@staticmethod
def create_security_groups(attrs=None, count=2):
diff --git a/openstackclient/tests/unit/compute/v2/test_server.py b/openstackclient/tests/unit/compute/v2/test_server.py
index fed847f1..71288a31 100644
--- a/openstackclient/tests/unit/compute/v2/test_server.py
+++ b/openstackclient/tests/unit/compute/v2/test_server.py
@@ -41,11 +41,6 @@ class TestServer(compute_fakes.TestComputev2):
self.flavors_mock = self.app.client_manager.compute.flavors
self.flavors_mock.reset_mock()
- # Get a shortcut to the compute client SecurityGroupManager Mock
- self.security_groups_mock = \
- self.app.client_manager.compute.security_groups
- self.security_groups_mock.reset_mock()
-
# Get a shortcut to the image client ImageManager Mock
self.images_mock = self.app.client_manager.image.images
self.images_mock.reset_mock()
@@ -232,6 +227,9 @@ class TestServerAddPort(TestServer):
self.find_port.assert_not_called()
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_find'
+)
class TestServerAddSecurityGroup(TestServer):
def setUp(self):
@@ -239,11 +237,9 @@ class TestServerAddSecurityGroup(TestServer):
self.security_group = \
compute_fakes.FakeSecurityGroup.create_one_security_group()
- # This is the return value for utils.find_resource() for security group
- self.security_groups_mock.get.return_value = self.security_group
attrs = {
- 'security_groups': [{'name': self.security_group.id}]
+ 'security_groups': [{'name': self.security_group['id']}]
}
methods = {
'add_security_group': None,
@@ -259,23 +255,24 @@ class TestServerAddSecurityGroup(TestServer):
# Get the command object to test
self.cmd = server.AddServerSecurityGroup(self.app, None)
- def test_server_add_security_group(self):
+ def test_server_add_security_group(self, sg_find_mock):
+ sg_find_mock.return_value = self.security_group
arglist = [
self.server.id,
- self.security_group.id
+ self.security_group['id']
]
verifylist = [
('server', self.server.id),
- ('group', self.security_group.id),
+ ('group', self.security_group['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.security_groups_mock.get.assert_called_with(
- self.security_group.id,
+ sg_find_mock.assert_called_with(
+ self.security_group['id'],
)
self.servers_mock.get.assert_called_with(self.server.id)
self.server.add_security_group.assert_called_with(
- self.security_group.id,
+ self.security_group['id'],
)
self.assertIsNone(result)
@@ -1716,6 +1713,9 @@ class TestServerRemovePort(TestServer):
self.find_port.assert_not_called()
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_find'
+)
class TestServerRemoveSecurityGroup(TestServer):
def setUp(self):
@@ -1723,11 +1723,9 @@ class TestServerRemoveSecurityGroup(TestServer):
self.security_group = \
compute_fakes.FakeSecurityGroup.create_one_security_group()
- # This is the return value for utils.find_resource() for security group
- self.security_groups_mock.get.return_value = self.security_group
attrs = {
- 'security_groups': [{'name': self.security_group.id}]
+ 'security_groups': [{'name': self.security_group['id']}]
}
methods = {
'remove_security_group': None,
@@ -1743,23 +1741,24 @@ class TestServerRemoveSecurityGroup(TestServer):
# Get the command object to test
self.cmd = server.RemoveServerSecurityGroup(self.app, None)
- def test_server_remove_security_group(self):
+ def test_server_remove_security_group(self, sg_find_mock):
+ sg_find_mock.return_value = self.security_group
arglist = [
self.server.id,
- self.security_group.id
+ self.security_group['id']
]
verifylist = [
('server', self.server.id),
- ('group', self.security_group.id),
+ ('group', self.security_group['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.security_groups_mock.get.assert_called_with(
- self.security_group.id,
+ sg_find_mock.assert_called_with(
+ self.security_group['id'],
)
self.servers_mock.get.assert_called_with(self.server.id)
self.server.remove_security_group.assert_called_with(
- self.security_group.id,
+ self.security_group['id'],
)
self.assertIsNone(result)
diff --git a/openstackclient/tests/unit/fakes.py b/openstackclient/tests/unit/fakes.py
index f28f9103..999694b7 100644
--- a/openstackclient/tests/unit/fakes.py
+++ b/openstackclient/tests/unit/fakes.py
@@ -228,6 +228,9 @@ class FakeResource(object):
def get(self, item, default=None):
return self._info.get(item, default)
+ def pop(self, key, default_value=None):
+ return self.info.pop(key, default_value)
+
class FakeResponse(requests.Response):
diff --git a/openstackclient/tests/unit/network/v2/test_security_group_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_compute.py
index 2fd44188..db9831bb 100644
--- a/openstackclient/tests/unit/network/v2/test_security_group_compute.py
+++ b/openstackclient/tests/unit/network/v2/test_security_group_compute.py
@@ -31,10 +31,14 @@ class TestSecurityGroupCompute(compute_fakes.TestComputev2):
self.compute = self.app.client_manager.compute
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_create'
+)
class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
project = identity_fakes.FakeProject.create_one_project()
domain = identity_fakes.FakeDomain.create_one_domain()
+
# The security group to be shown.
_security_group = \
compute_fakes.FakeSecurityGroup.create_one_security_group()
@@ -48,10 +52,10 @@ class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
)
data = (
- _security_group.description,
- _security_group.id,
- _security_group.name,
- _security_group.tenant_id,
+ _security_group['description'],
+ _security_group['id'],
+ _security_group['name'],
+ _security_group['tenant_id'],
'',
)
@@ -60,61 +64,57 @@ class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
self.app.client_manager.network_endpoint_enabled = False
- self.compute.security_groups.create.return_value = self._security_group
-
# Get the command object to test
self.cmd = security_group.CreateSecurityGroup(self.app, None)
- def test_create_no_options(self):
+ def test_security_group_create_no_options(self, sg_mock):
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, [], [])
- def test_create_network_options(self):
- arglist = [
- '--project', self.project.name,
- '--project-domain', self.domain.name,
- self._security_group.name,
- ]
- self.assertRaises(tests_utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
- def test_create_min_options(self):
+ def test_security_group_create_min_options(self, sg_mock):
+ sg_mock.return_value = self._security_group
arglist = [
- self._security_group.name,
+ self._security_group['name'],
]
verifylist = [
- ('name', self._security_group.name),
+ ('name', self._security_group['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.create.assert_called_once_with(
- self._security_group.name,
- self._security_group.name)
+ sg_mock.assert_called_once_with(
+ self._security_group['name'],
+ self._security_group['name'],
+ )
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
- def test_create_all_options(self):
+ def test_security_group_create_all_options(self, sg_mock):
+ sg_mock.return_value = self._security_group
arglist = [
- '--description', self._security_group.description,
- self._security_group.name,
+ '--description', self._security_group['description'],
+ self._security_group['name'],
]
verifylist = [
- ('description', self._security_group.description),
- ('name', self._security_group.name),
+ ('description', self._security_group['description']),
+ ('name', self._security_group['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.create.assert_called_once_with(
- self._security_group.name,
- self._security_group.description)
+ sg_mock.assert_called_once_with(
+ self._security_group['name'],
+ self._security_group['description'],
+ )
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_delete'
+)
class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
# The security groups to be deleted.
@@ -126,9 +126,7 @@ class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
self.app.client_manager.network_endpoint_enabled = False
- self.compute.security_groups.delete = mock.Mock(return_value=None)
-
- self.compute.security_groups.get = (
+ self.compute.api.security_group_find = (
compute_fakes.FakeSecurityGroup.get_security_groups(
self._security_groups)
)
@@ -136,27 +134,30 @@ class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
# Get the command object to test
self.cmd = security_group.DeleteSecurityGroup(self.app, None)
- def test_security_group_delete(self):
+ def test_security_group_delete(self, sg_mock):
+ sg_mock.return_value = mock.Mock(return_value=None)
arglist = [
- self._security_groups[0].id,
+ self._security_groups[0]['id'],
]
verifylist = [
- ('group', [self._security_groups[0].id]),
+ ('group', [self._security_groups[0]['id']]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.compute.security_groups.delete.assert_called_once_with(
- self._security_groups[0].id)
+ sg_mock.assert_called_once_with(
+ self._security_groups[0]['id'],
+ )
self.assertIsNone(result)
- def test_multi_security_groups_delete(self):
+ def test_security_group_multi_delete(self, sg_mock):
+ sg_mock.return_value = mock.Mock(return_value=None)
arglist = []
verifylist = []
for s in self._security_groups:
- arglist.append(s.id)
+ arglist.append(s['id'])
verifylist = [
('group', arglist),
]
@@ -166,43 +167,39 @@ class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
calls = []
for s in self._security_groups:
- calls.append(call(s.id))
- self.compute.security_groups.delete.assert_has_calls(calls)
+ calls.append(call(s['id']))
+ sg_mock.assert_has_calls(calls)
self.assertIsNone(result)
- def test_multi_security_groups_delete_with_exception(self):
+ def test_security_group_multi_delete_with_exception(self, sg_mock):
+ sg_mock.return_value = mock.Mock(return_value=None)
+ sg_mock.side_effect = ([
+ mock.Mock(return_value=None),
+ exceptions.CommandError,
+ ])
arglist = [
- self._security_groups[0].id,
+ self._security_groups[0]['id'],
'unexist_security_group',
]
verifylist = [
('group',
- [self._security_groups[0].id, 'unexist_security_group']),
+ [self._security_groups[0]['id'], 'unexist_security_group']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
- find_mock_result = [self._security_groups[0], exceptions.CommandError]
- self.compute.security_groups.get = (
- mock.Mock(side_effect=find_mock_result)
- )
- self.compute.security_groups.find.side_effect = (
- exceptions.NotFound(None))
-
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 groups failed to delete.', str(e))
- self.compute.security_groups.get.assert_any_call(
- self._security_groups[0].id)
- self.compute.security_groups.get.assert_any_call(
- 'unexist_security_group')
- self.compute.security_groups.delete.assert_called_once_with(
- self._security_groups[0].id
- )
+ sg_mock.assert_any_call(self._security_groups[0]['id'])
+ sg_mock.assert_any_call('unexist_security_group')
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_list'
+)
class TestListSecurityGroupCompute(TestSecurityGroupCompute):
# The security group to be listed.
@@ -224,29 +221,29 @@ class TestListSecurityGroupCompute(TestSecurityGroupCompute):
data = []
for grp in _security_groups:
data.append((
- grp.id,
- grp.name,
- grp.description,
+ grp['id'],
+ grp['name'],
+ grp['description'],
))
data_all_projects = []
for grp in _security_groups:
data_all_projects.append((
- grp.id,
- grp.name,
- grp.description,
- grp.tenant_id,
+ grp['id'],
+ grp['name'],
+ grp['description'],
+ grp['tenant_id'],
))
def setUp(self):
super(TestListSecurityGroupCompute, self).setUp()
self.app.client_manager.network_endpoint_enabled = False
- self.compute.security_groups.list.return_value = self._security_groups
# Get the command object to test
self.cmd = security_group.ListSecurityGroup(self.app, None)
- def test_security_group_list_no_options(self):
+ def test_security_group_list_no_options(self, sg_mock):
+ sg_mock.return_value = self._security_groups
arglist = []
verifylist = [
('all_projects', False),
@@ -256,11 +253,12 @@ class TestListSecurityGroupCompute(TestSecurityGroupCompute):
columns, data = self.cmd.take_action(parsed_args)
kwargs = {'search_opts': {'all_tenants': False}}
- self.compute.security_groups.list.assert_called_once_with(**kwargs)
+ sg_mock.assert_called_once_with(**kwargs)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
- def test_security_group_list_all_projects(self):
+ def test_security_group_list_all_projects(self, sg_mock):
+ sg_mock.return_value = self._security_groups
arglist = [
'--all-projects',
]
@@ -272,11 +270,14 @@ class TestListSecurityGroupCompute(TestSecurityGroupCompute):
columns, data = self.cmd.take_action(parsed_args)
kwargs = {'search_opts': {'all_tenants': True}}
- self.compute.security_groups.list.assert_called_once_with(**kwargs)
+ sg_mock.assert_called_once_with(**kwargs)
self.assertEqual(self.columns_all_projects, columns)
self.assertEqual(self.data_all_projects, list(data))
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_set'
+)
class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
# The security group to be set.
@@ -288,54 +289,54 @@ class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
self.app.client_manager.network_endpoint_enabled = False
- self.compute.security_groups.update = mock.Mock(return_value=None)
-
- self.compute.security_groups.get = mock.Mock(
+ self.compute.api.security_group_find = mock.Mock(
return_value=self._security_group)
# Get the command object to test
self.cmd = security_group.SetSecurityGroup(self.app, None)
- def test_set_no_options(self):
+ def test_security_group_set_no_options(self, sg_mock):
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, [], [])
- def test_set_no_updates(self):
+ def test_security_group_set_no_updates(self, sg_mock):
+ sg_mock.return_value = mock.Mock(return_value=None)
arglist = [
- self._security_group.name,
+ self._security_group['name'],
]
verifylist = [
- ('group', self._security_group.name),
+ ('group', self._security_group['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.compute.security_groups.update.assert_called_once_with(
+ sg_mock.assert_called_once_with(
self._security_group,
- self._security_group.name,
- self._security_group.description
+ self._security_group['name'],
+ self._security_group['description'],
)
self.assertIsNone(result)
- def test_set_all_options(self):
- new_name = 'new-' + self._security_group.name
- new_description = 'new-' + self._security_group.description
+ def test_security_group_set_all_options(self, sg_mock):
+ sg_mock.return_value = mock.Mock(return_value=None)
+ new_name = 'new-' + self._security_group['name']
+ new_description = 'new-' + self._security_group['description']
arglist = [
'--name', new_name,
'--description', new_description,
- self._security_group.name,
+ self._security_group['name'],
]
verifylist = [
('description', new_description),
- ('group', self._security_group.name),
+ ('group', self._security_group['name']),
('name', new_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.compute.security_groups.update.assert_called_once_with(
+ sg_mock.assert_called_once_with(
self._security_group,
new_name,
new_description
@@ -343,6 +344,9 @@ class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
self.assertIsNone(result)
+@mock.patch(
+ 'openstackclient.api.compute_v2.APIv2.security_group_find'
+)
class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
# The security group rule to be shown with the group.
@@ -364,10 +368,10 @@ class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
)
data = (
- _security_group.description,
- _security_group.id,
- _security_group.name,
- _security_group.tenant_id,
+ _security_group['description'],
+ _security_group['id'],
+ _security_group['name'],
+ _security_group['tenant_id'],
security_group._format_compute_security_group_rules(
[_security_group_rule._info]),
)
@@ -377,27 +381,25 @@ class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
self.app.client_manager.network_endpoint_enabled = False
- self.compute.security_groups.get.return_value = self._security_group
-
# Get the command object to test
self.cmd = security_group.ShowSecurityGroup(self.app, None)
- def test_show_no_options(self):
+ def test_security_group_show_no_options(self, sg_mock):
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, [], [])
- def test_show_all_options(self):
+ def test_security_group_show_all_options(self, sg_mock):
+ sg_mock.return_value = self._security_group
arglist = [
- self._security_group.id,
+ self._security_group['id'],
]
verifylist = [
- ('group', self._security_group.id),
+ ('group', self._security_group['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.get.assert_called_once_with(
- self._security_group.id)
+ sg_mock.assert_called_once_with(self._security_group['id'])
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py
index 7833c0d9..06a7a25d 100644
--- a/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py
+++ b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py
@@ -11,7 +11,6 @@
# under the License.
#
-import copy
import mock
from mock import call
@@ -20,7 +19,6 @@ from osc_lib import exceptions
from openstackclient.network import utils as network_utils
from openstackclient.network.v2 import security_group_rule
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
-from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
from openstackclient.tests.unit import utils as tests_utils
@@ -38,6 +36,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
project = identity_fakes.FakeProject.create_one_project()
domain = identity_fakes.FakeDomain.create_one_domain()
+
# The security group rule to be created.
_security_group_rule = None
@@ -61,51 +60,53 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
self.app.client_manager.network_endpoint_enabled = False
- self.compute.security_groups.get.return_value = self._security_group
+ self.compute.api.security_group_find = mock.Mock(
+ return_value=self._security_group,
+ )
# Get the command object to test
self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None)
- def test_create_no_options(self):
+ def test_security_group_rule_create_no_options(self):
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, [], [])
- def test_create_all_source_options(self):
+ def test_security_group_rule_create_all_source_options(self):
arglist = [
'--src-ip', '10.10.0.0/24',
- '--src-group', self._security_group.id,
- self._security_group.id,
+ '--src-group', self._security_group['id'],
+ self._security_group['id'],
]
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, arglist, [])
- def test_create_all_remote_options(self):
+ def test_security_group_rule_create_all_remote_options(self):
arglist = [
'--remote-ip', '10.10.0.0/24',
- '--remote-group', self._security_group.id,
- self._security_group.id,
+ '--remote-group', self._security_group['id'],
+ self._security_group['id'],
]
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, arglist, [])
- def test_create_bad_protocol(self):
+ def test_security_group_rule_create_bad_protocol(self):
arglist = [
'--protocol', 'foo',
- self._security_group.id,
+ self._security_group['id'],
]
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, arglist, [])
- def test_create_all_protocol_options(self):
+ def test_security_group_rule_create_all_protocol_options(self):
arglist = [
'--protocol', 'tcp',
'--proto', 'tcp',
- self._security_group.id,
+ self._security_group['id'],
]
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, arglist, [])
- def test_create_network_options(self):
+ def test_security_group_rule_create_network_options(self):
arglist = [
'--ingress',
'--ethertype', 'IPv4',
@@ -113,30 +114,32 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
'--icmp-code', '11',
'--project', self.project.name,
'--project-domain', self.domain.name,
- self._security_group.id,
+ self._security_group['id'],
]
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, arglist, [])
- def test_create_default_rule(self):
+ def test_security_group_rule_create_default_rule(self):
expected_columns, expected_data = self._setup_security_group_rule()
dst_port = str(self._security_group_rule.from_port) + ':' + \
str(self._security_group_rule.to_port)
arglist = [
'--dst-port', dst_port,
- self._security_group.id,
+ self._security_group['id'],
]
verifylist = [
('dst_port', (self._security_group_rule.from_port,
self._security_group_rule.to_port)),
- ('group', self._security_group.id),
+ ('group', self._security_group['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
+ # TODO(dtroyer): save this for the security group rule changes
+ # self.compute.api.security_group_rule_create.assert_called_once_with(
self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
+ self._security_group['id'],
self._security_group_rule.ip_protocol,
self._security_group_rule.from_port,
self._security_group_rule.to_port,
@@ -146,71 +149,75 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
self.assertEqual(expected_columns, columns)
self.assertEqual(expected_data, data)
- def test_create_source_group(self):
+ def test_security_group_rule_create_source_group(self):
expected_columns, expected_data = self._setup_security_group_rule({
'from_port': 22,
'to_port': 22,
- 'group': {'name': self._security_group.name},
+ 'group': {'name': self._security_group['name']},
})
arglist = [
'--dst-port', str(self._security_group_rule.from_port),
- '--src-group', self._security_group.name,
- self._security_group.id,
+ '--src-group', self._security_group['name'],
+ self._security_group['id'],
]
verifylist = [
('dst_port', (self._security_group_rule.from_port,
self._security_group_rule.to_port)),
- ('src_group', self._security_group.name),
- ('group', self._security_group.id),
+ ('src_group', self._security_group['name']),
+ ('group', self._security_group['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
+ # TODO(dtroyer): save this for the security group rule changes
+ # self.compute.api.security_group_rule_create.assert_called_once_with(
self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
+ self._security_group['id'],
self._security_group_rule.ip_protocol,
self._security_group_rule.from_port,
self._security_group_rule.to_port,
self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
+ self._security_group['id'],
)
self.assertEqual(expected_columns, columns)
self.assertEqual(expected_data, data)
- def test_create_remote_group(self):
+ def test_security_group_rule_create_remote_group(self):
expected_columns, expected_data = self._setup_security_group_rule({
'from_port': 22,
'to_port': 22,
- 'group': {'name': self._security_group.name},
+ 'group': {'name': self._security_group['name']},
})
arglist = [
'--dst-port', str(self._security_group_rule.from_port),
- '--remote-group', self._security_group.name,
- self._security_group.id,
+ '--remote-group', self._security_group['name'],
+ self._security_group['id'],
]
verifylist = [
('dst_port', (self._security_group_rule.from_port,
self._security_group_rule.to_port)),
- ('remote_group', self._security_group.name),
- ('group', self._security_group.id),
+ ('remote_group', self._security_group['name']),
+ ('group', self._security_group['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
+ # TODO(dtroyer): save this for the security group rule changes
+ # self.compute.api.security_group_rule_create.assert_called_once_with(
self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
+ self._security_group['id'],
self._security_group_rule.ip_protocol,
self._security_group_rule.from_port,
self._security_group_rule.to_port,
self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
+ self._security_group['id'],
)
self.assertEqual(expected_columns, columns)
self.assertEqual(expected_data, data)
- def test_create_source_ip(self):
+ def test_security_group_rule_create_source_ip(self):
expected_columns, expected_data = self._setup_security_group_rule({
'ip_protocol': 'icmp',
'from_port': -1,
@@ -220,19 +227,21 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
arglist = [
'--protocol', self._security_group_rule.ip_protocol,
'--src-ip', self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
+ self._security_group['id'],
]
verifylist = [
('protocol', self._security_group_rule.ip_protocol),
('src_ip', self._security_group_rule.ip_range['cidr']),
- ('group', self._security_group.id),
+ ('group', self._security_group['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
+ # TODO(dtroyer): save this for the security group rule changes
+ # self.compute.api.security_group_rule_create.assert_called_once_with(
self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
+ self._security_group['id'],
self._security_group_rule.ip_protocol,
self._security_group_rule.from_port,
self._security_group_rule.to_port,
@@ -242,7 +251,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
self.assertEqual(expected_columns, columns)
self.assertEqual(expected_data, data)
- def test_create_remote_ip(self):
+ def test_security_group_rule_create_remote_ip(self):
expected_columns, expected_data = self._setup_security_group_rule({
'ip_protocol': 'icmp',
'from_port': -1,
@@ -252,19 +261,21 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
arglist = [
'--protocol', self._security_group_rule.ip_protocol,
'--remote-ip', self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
+ self._security_group['id'],
]
verifylist = [
('protocol', self._security_group_rule.ip_protocol),
('remote_ip', self._security_group_rule.ip_range['cidr']),
- ('group', self._security_group.id),
+ ('group', self._security_group['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
+ # TODO(dtroyer): save this for the security group rule changes
+ # self.compute.api.security_group_rule_create.assert_called_once_with(
self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
+ self._security_group['id'],
self._security_group_rule.ip_protocol,
self._security_group_rule.from_port,
self._security_group_rule.to_port,
@@ -274,7 +285,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
self.assertEqual(expected_columns, columns)
self.assertEqual(expected_data, data)
- def test_create_proto_option(self):
+ def test_security_group_rule_create_proto_option(self):
expected_columns, expected_data = self._setup_security_group_rule({
'ip_protocol': 'icmp',
'from_port': -1,
@@ -284,20 +295,22 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
arglist = [
'--proto', self._security_group_rule.ip_protocol,
'--src-ip', self._security_group_rule.ip_range['cidr'],
- self._security_group.id,
+ self._security_group['id'],
]
verifylist = [
('proto', self._security_group_rule.ip_protocol),
('protocol', None),
('src_ip', self._security_group_rule.ip_range['cidr']),
- ('group', self._security_group.id),
+ ('group', self._security_group['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
+ # TODO(dtroyer): save this for the security group rule changes
+ # self.compute.api.security_group_rule_create.assert_called_once_with(
self.compute.security_group_rules.create.assert_called_once_with(
- self._security_group.id,
+ self._security_group['id'],
self._security_group_rule.ip_protocol,
self._security_group_rule.from_port,
self._security_group_rule.to_port,
@@ -338,7 +351,7 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
self._security_group_rules[0].id)
self.assertIsNone(result)
- def test_multi_security_group_rules_delete(self):
+ def test_security_group_rule_multi_delete(self):
arglist = []
verifylist = []
@@ -357,7 +370,7 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
self.compute.security_group_rules.delete.assert_has_calls(calls)
self.assertIsNone(result)
- def test_multi_security_group_rules_delete_with_exception(self):
+ def test_security_group_rule_multi_delete_with_exception(self):
arglist = [
self._security_group_rules[0].id,
'unexist_rule',
@@ -397,7 +410,7 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
'ip_protocol': 'tcp',
'from_port': 80,
'to_port': 80,
- 'group': {'name': _security_group.name},
+ 'group': {'name': _security_group['name']},
})
_security_group_rule_icmp = \
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
@@ -405,10 +418,12 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
'from_port': -1,
'to_port': -1,
'ip_range': {'cidr': '10.0.2.0/24'},
- 'group': {'name': _security_group.name},
+ 'group': {'name': _security_group['name']},
})
- _security_group.rules = [_security_group_rule_tcp._info,
- _security_group_rule_icmp._info]
+ _security_group['rules'] = [
+ _security_group_rule_tcp._info,
+ _security_group_rule_icmp._info,
+ ]
expected_columns_with_group = (
'ID',
@@ -422,7 +437,7 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
expected_data_with_group = []
expected_data_no_group = []
- for _security_group_rule in _security_group.rules:
+ for _security_group_rule in _security_group['rules']:
rule = network_utils.transform_compute_security_group_rule(
_security_group_rule
)
@@ -443,41 +458,43 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
self.app.client_manager.network_endpoint_enabled = False
- self.compute.security_groups.get.return_value = \
- self._security_group
- self.compute.security_groups.list.return_value = \
- [self._security_group]
+ self.compute.api.security_group_find = mock.Mock(
+ return_value=self._security_group,
+ )
+ self.compute.api.security_group_list = mock.Mock(
+ return_value=[self._security_group],
+ )
# Get the command object to test
self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None)
- def test_list_default(self):
+ def test_security_group_rule_list_default(self):
parsed_args = self.check_parser(self.cmd, [], [])
columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.list.assert_called_once_with(
+ self.compute.api.security_group_list.assert_called_once_with(
search_opts={'all_tenants': False}
)
self.assertEqual(self.expected_columns_no_group, columns)
self.assertEqual(self.expected_data_no_group, list(data))
- def test_list_with_group(self):
+ def test_security_group_rule_list_with_group(self):
arglist = [
- self._security_group.id,
+ self._security_group['id'],
]
verifylist = [
- ('group', self._security_group.id),
+ ('group', self._security_group['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.get.assert_called_once_with(
- self._security_group.id
+ self.compute.api.security_group_find.assert_called_once_with(
+ self._security_group['id']
)
self.assertEqual(self.expected_columns_with_group, columns)
self.assertEqual(self.expected_data_with_group, list(data))
- def test_list_all_projects(self):
+ def test_security_group_rule_list_all_projects(self):
arglist = [
'--all-projects',
]
@@ -487,13 +504,13 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.list.assert_called_once_with(
+ self.compute.api.security_group_list.assert_called_once_with(
search_opts={'all_tenants': True}
)
self.assertEqual(self.expected_columns_no_group, columns)
self.assertEqual(self.expected_data_no_group, list(data))
- def test_list_with_ignored_options(self):
+ def test_security_group_rule_list_with_ignored_options(self):
arglist = [
'--long',
]
@@ -503,7 +520,7 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.list.assert_called_once_with(
+ self.compute.api.security_group_list.assert_called_once_with(
search_opts={'all_tenants': False}
)
self.assertEqual(self.expected_columns_no_group, columns)
@@ -527,20 +544,19 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
# Build a security group fake customized for this test.
security_group_rules = [self._security_group_rule._info]
- security_group = fakes.FakeResource(
- info=copy.deepcopy({'rules': security_group_rules}),
- loaded=True)
- security_group.rules = security_group_rules
- self.compute.security_groups.list.return_value = [security_group]
+ security_group = {'rules': security_group_rules}
+ self.compute.api.security_group_list = mock.Mock(
+ return_value=[security_group],
+ )
# Get the command object to test
self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None)
- def test_show_no_options(self):
+ def test_security_group_rule_show_no_options(self):
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, [], [])
- def test_show_all_options(self):
+ def test_security_group_rule_show_all_options(self):
arglist = [
self._security_group_rule.id,
]
@@ -551,6 +567,6 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.list.assert_called_once_with()
+ self.compute.api.security_group_list.assert_called_once_with()
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)