summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-07-29 15:47:07 +0000
committerGerrit Code Review <review@openstack.org>2016-07-29 15:47:07 +0000
commitfe650e204bf0ce7ca82f12b56da02dde98eeff15 (patch)
treee158387344d76bbaa7624513053ff90137a0955f /openstackclient
parentb7b140df06faa1ff451ca5e50f3690191cb316cb (diff)
parent13bc3793e0f0378db0151acb171dbe5f2d9c08dd (diff)
downloadpython-openstackclient-fe650e204bf0ce7ca82f12b56da02dde98eeff15.tar.gz
Merge "Implement network rbac create and delete commands"
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/network/v2/network_rbac.py132
-rw-r--r--openstackclient/tests/network/v2/fakes.py19
-rw-r--r--openstackclient/tests/network/v2/test_network_rbac.py258
3 files changed, 406 insertions, 3 deletions
diff --git a/openstackclient/network/v2/network_rbac.py b/openstackclient/network/v2/network_rbac.py
index 7a759449..62968376 100644
--- a/openstackclient/network/v2/network_rbac.py
+++ b/openstackclient/network/v2/network_rbac.py
@@ -13,10 +13,17 @@
"""RBAC action implementations"""
+import logging
+
from osc_lib.command import command
+from osc_lib import exceptions
from osc_lib import utils
from openstackclient.i18n import _
+from openstackclient.identity import common as identity_common
+
+
+LOG = logging.getLogger(__name__)
def _get_columns(item):
@@ -30,6 +37,131 @@ def _get_columns(item):
return tuple(sorted(columns))
+def _get_attrs(client_manager, parsed_args):
+ attrs = {}
+ attrs['object_type'] = parsed_args.type
+ attrs['action'] = parsed_args.action
+
+ network_client = client_manager.network
+ if parsed_args.type == 'network':
+ object_id = network_client.find_network(
+ parsed_args.rbac_object, ignore_missing=False).id
+ if parsed_args.type == 'qos_policy':
+ # TODO(Huanxuan Ao): Support finding a object ID by obejct name
+ # after qos policy finding supported in SDK.
+ object_id = parsed_args.rbac_object
+ attrs['object_id'] = object_id
+
+ identity_client = client_manager.identity
+ project_id = identity_common.find_project(
+ identity_client,
+ parsed_args.target_project,
+ parsed_args.target_project_domain,
+ ).id
+ attrs['target_tenant'] = project_id
+ if parsed_args.project is not None:
+ project_id = identity_common.find_project(
+ identity_client,
+ parsed_args.project,
+ parsed_args.project_domain,
+ ).id
+ attrs['tenant_id'] = project_id
+
+ return attrs
+
+
+class CreateNetworkRBAC(command.ShowOne):
+ """Create network RBAC policy"""
+
+ def get_parser(self, prog_name):
+ parser = super(CreateNetworkRBAC, self).get_parser(prog_name)
+ parser.add_argument(
+ 'rbac_object',
+ metavar="<rbac-object>",
+ help=_("The object to which this RBAC policy affects (name or "
+ "ID for network objects, ID only for QoS policy objects)")
+ )
+ parser.add_argument(
+ '--type',
+ metavar="<type>",
+ required=True,
+ choices=['qos_policy', 'network'],
+ help=_('Type of the object that RBAC policy '
+ 'affects ("qos_policy" or "network")')
+ )
+ parser.add_argument(
+ '--action',
+ metavar="<action>",
+ required=True,
+ choices=['access_as_external', 'access_as_shared'],
+ help=_('Action for the RBAC policy '
+ '("access_as_external" or "access_as_shared")')
+ )
+ parser.add_argument(
+ '--target-project',
+ required=True,
+ metavar="<target-project>",
+ help=_('The project to which the RBAC policy '
+ 'will be enforced (name or ID)')
+ )
+ parser.add_argument(
+ '--target-project-domain',
+ metavar='<target-project-domain>',
+ help=_('Domain the target project belongs to (name or ID). '
+ 'This can be used in case collisions between project names '
+ 'exist.'),
+ )
+ parser.add_argument(
+ '--project',
+ metavar="<project>",
+ help=_('The owner project (name or ID)')
+ )
+ identity_common.add_project_domain_option_to_parser(parser)
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.network
+ attrs = _get_attrs(self.app.client_manager, parsed_args)
+ obj = client.create_rbac_policy(**attrs)
+ columns = _get_columns(obj)
+ data = utils.get_item_properties(obj, columns)
+ return columns, data
+
+
+class DeleteNetworkRBAC(command.Command):
+ """Delete network RBAC policy(s)"""
+
+ def get_parser(self, prog_name):
+ parser = super(DeleteNetworkRBAC, self).get_parser(prog_name)
+ parser.add_argument(
+ 'rbac_policy',
+ metavar="<rbac-policy>",
+ nargs='+',
+ help=_("RBAC policy(s) to delete (ID only)")
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.network
+ result = 0
+
+ for rbac in parsed_args.rbac_policy:
+ try:
+ obj = client.find_rbac_policy(rbac, ignore_missing=False)
+ client.delete_rbac_policy(obj)
+ except Exception as e:
+ result += 1
+ LOG.error(_("Failed to delete RBAC policy with "
+ "ID '%(rbac)s': %(e)s"),
+ {'rbac': rbac, 'e': e})
+
+ if result > 0:
+ total = len(parsed_args.rbac_policy)
+ msg = (_("%(result)s of %(total)s RBAC policies failed "
+ "to delete.") % {'result': result, 'total': total})
+ raise exceptions.CommandError(msg)
+
+
class ListNetworkRBAC(command.Lister):
"""List network RBAC policies"""
diff --git a/openstackclient/tests/network/v2/fakes.py b/openstackclient/tests/network/v2/fakes.py
index 05752094..9182fe55 100644
--- a/openstackclient/tests/network/v2/fakes.py
+++ b/openstackclient/tests/network/v2/fakes.py
@@ -541,6 +541,25 @@ class FakeNetworkRBAC(object):
return rbac_policies
+ @staticmethod
+ def get_network_rbacs(rbac_policies=None, count=2):
+ """Get an iterable MagicMock object with a list of faked rbac policies.
+
+ If rbac policies list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
+
+ :param List rbac_policies:
+ A list of FakeResource objects faking rbac policies
+ :param int count:
+ The number of rbac policies to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ rbac policies
+ """
+ if rbac_policies is None:
+ rbac_policies = FakeNetworkRBAC.create_network_rbacs(count)
+ return mock.MagicMock(side_effect=rbac_policies)
+
class FakeRouter(object):
"""Fake one or more routers."""
diff --git a/openstackclient/tests/network/v2/test_network_rbac.py b/openstackclient/tests/network/v2/test_network_rbac.py
index 057e0adc..5d6e9cfa 100644
--- a/openstackclient/tests/network/v2/test_network_rbac.py
+++ b/openstackclient/tests/network/v2/test_network_rbac.py
@@ -12,8 +12,12 @@
#
import mock
+from mock import call
+
+from osc_lib import exceptions
from openstackclient.network.v2 import network_rbac
+from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3
from openstackclient.tests.network.v2 import fakes as network_fakes
from openstackclient.tests import utils as tests_utils
@@ -25,6 +29,252 @@ class TestNetworkRBAC(network_fakes.TestNetworkV2):
# Get a shortcut to the network client
self.network = self.app.client_manager.network
+ # Get a shortcut to the ProjectManager Mock
+ self.projects_mock = self.app.client_manager.identity.projects
+
+
+class TestCreateNetworkRBAC(TestNetworkRBAC):
+
+ network_object = network_fakes.FakeNetwork.create_one_network()
+ project = identity_fakes_v3.FakeProject.create_one_project()
+ rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac(
+ attrs={'tenant_id': project.id,
+ 'target_tenant': project.id,
+ 'object_id': network_object.id}
+ )
+
+ columns = (
+ 'action',
+ 'id',
+ 'object_id',
+ 'object_type',
+ 'project_id',
+ 'target_project',
+ )
+
+ data = [
+ rbac_policy.action,
+ rbac_policy.id,
+ rbac_policy.object_id,
+ rbac_policy.object_type,
+ rbac_policy.tenant_id,
+ rbac_policy.target_tenant,
+ ]
+
+ def setUp(self):
+ super(TestCreateNetworkRBAC, self).setUp()
+
+ # Get the command object to test
+ self.cmd = network_rbac.CreateNetworkRBAC(self.app, self.namespace)
+
+ self.network.create_rbac_policy = mock.Mock(
+ return_value=self.rbac_policy)
+ self.network.find_network = mock.Mock(
+ return_value=self.network_object)
+ self.projects_mock.get.return_value = self.project
+
+ def test_network_rbac_create_no_type(self):
+ arglist = [
+ '--action', self.rbac_policy.action,
+ self.rbac_policy.object_id,
+ ]
+ verifylist = [
+ ('action', self.rbac_policy.action),
+ ('rbac_policy', self.rbac_policy.id),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_network_rbac_create_no_action(self):
+ arglist = [
+ '--type', self.rbac_policy.object_type,
+ self.rbac_policy.object_id,
+ ]
+ verifylist = [
+ ('type', self.rbac_policy.object_type),
+ ('rbac_policy', self.rbac_policy.id),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_network_rbac_create_invalid_type(self):
+ arglist = [
+ '--action', self.rbac_policy.action,
+ '--type', 'invalid_type',
+ '--target-project', self.rbac_policy.target_tenant,
+ self.rbac_policy.object_id,
+ ]
+ verifylist = [
+ ('action', self.rbac_policy.action),
+ ('type', 'invalid_type'),
+ ('target-project', self.rbac_policy.target_tenant),
+ ('rbac_policy', self.rbac_policy.id),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_network_rbac_create_invalid_action(self):
+ arglist = [
+ '--type', self.rbac_policy.object_type,
+ '--action', 'invalid_action',
+ '--target-project', self.rbac_policy.target_tenant,
+ self.rbac_policy.object_id,
+ ]
+ verifylist = [
+ ('type', self.rbac_policy.object_type),
+ ('action', 'invalid_action'),
+ ('target-project', self.rbac_policy.target_tenant),
+ ('rbac_policy', self.rbac_policy.id),
+ ]
+
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_network_rbac_create(self):
+ arglist = [
+ '--type', self.rbac_policy.object_type,
+ '--action', self.rbac_policy.action,
+ '--target-project', self.rbac_policy.target_tenant,
+ self.rbac_policy.object_id,
+ ]
+ verifylist = [
+ ('type', self.rbac_policy.object_type),
+ ('action', self.rbac_policy.action),
+ ('target_project', self.rbac_policy.target_tenant),
+ ('rbac_object', self.rbac_policy.object_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_rbac_policy.assert_called_with(**{
+ 'object_id': self.rbac_policy.object_id,
+ 'object_type': self.rbac_policy.object_type,
+ 'action': self.rbac_policy.action,
+ 'target_tenant': self.rbac_policy.target_tenant,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+ def test_network_rbac_create_all_options(self):
+ arglist = [
+ '--type', self.rbac_policy.object_type,
+ '--action', self.rbac_policy.action,
+ '--target-project', self.rbac_policy.target_tenant,
+ '--project', self.rbac_policy.tenant_id,
+ '--project-domain', self.project.domain_id,
+ '--target-project-domain', self.project.domain_id,
+ self.rbac_policy.object_id,
+ ]
+ verifylist = [
+ ('type', self.rbac_policy.object_type),
+ ('action', self.rbac_policy.action),
+ ('target_project', self.rbac_policy.target_tenant),
+ ('project', self.rbac_policy.tenant_id),
+ ('project_domain', self.project.domain_id),
+ ('target_project_domain', self.project.domain_id),
+ ('rbac_object', self.rbac_policy.object_id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_rbac_policy.assert_called_with(**{
+ 'object_id': self.rbac_policy.object_id,
+ 'object_type': self.rbac_policy.object_type,
+ 'action': self.rbac_policy.action,
+ 'target_tenant': self.rbac_policy.target_tenant,
+ 'tenant_id': self.rbac_policy.tenant_id,
+ })
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, list(data))
+
+
+class TestDeleteNetworkRBAC(TestNetworkRBAC):
+
+ rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=2)
+
+ def setUp(self):
+ super(TestDeleteNetworkRBAC, self).setUp()
+ self.network.delete_rbac_policy = mock.Mock(return_value=None)
+ self.network.find_rbac_policy = (
+ network_fakes.FakeNetworkRBAC.get_network_rbacs(
+ rbac_policies=self.rbac_policies)
+ )
+
+ # Get the command object to test
+ self.cmd = network_rbac.DeleteNetworkRBAC(self.app, self.namespace)
+
+ def test_network_rbac_delete(self):
+ arglist = [
+ self.rbac_policies[0].id,
+ ]
+ verifylist = [
+ ('rbac_policy', [self.rbac_policies[0].id]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_rbac_policy.assert_called_once_with(
+ self.rbac_policies[0].id, ignore_missing=False)
+ self.network.delete_rbac_policy.assert_called_once_with(
+ self.rbac_policies[0])
+ self.assertIsNone(result)
+
+ def test_multi_network_rbacs_delete(self):
+ arglist = []
+ verifylist = []
+
+ for r in self.rbac_policies:
+ arglist.append(r.id)
+ verifylist = [
+ ('rbac_policy', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for r in self.rbac_policies:
+ calls.append(call(r))
+ self.network.delete_rbac_policy.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_network_policies_delete_with_exception(self):
+ arglist = [
+ self.rbac_policies[0].id,
+ 'unexist_rbac_policy',
+ ]
+ verifylist = [
+ ('rbac_policy',
+ [self.rbac_policies[0].id, 'unexist_rbac_policy']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self.rbac_policies[0], exceptions.CommandError]
+ self.network.find_rbac_policy = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 RBAC policies failed to delete.', str(e))
+
+ self.network.find_rbac_policy.assert_any_call(
+ self.rbac_policies[0].id, ignore_missing=False)
+ self.network.find_rbac_policy.assert_any_call(
+ 'unexist_rbac_policy', ignore_missing=False)
+ self.network.delete_rbac_policy.assert_called_once_with(
+ self.rbac_policies[0]
+ )
class TestListNetworkRABC(TestNetworkRBAC):
@@ -107,16 +357,18 @@ class TestShowNetworkRBAC(TestNetworkRBAC):
def test_network_rbac_show_all_options(self):
arglist = [
- self.rbac_policy.object_id,
+ self.rbac_policy.id,
+ ]
+ verifylist = [
+ ('rbac_policy', self.rbac_policy.id),
]
- verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
self.network.find_rbac_policy.assert_called_with(
- self.rbac_policy.object_id, ignore_missing=False
+ self.rbac_policy.id, ignore_missing=False
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))