diff options
| author | Jenkins <jenkins@review.openstack.org> | 2016-07-29 15:47:07 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2016-07-29 15:47:07 +0000 |
| commit | fe650e204bf0ce7ca82f12b56da02dde98eeff15 (patch) | |
| tree | e158387344d76bbaa7624513053ff90137a0955f /openstackclient | |
| parent | b7b140df06faa1ff451ca5e50f3690191cb316cb (diff) | |
| parent | 13bc3793e0f0378db0151acb171dbe5f2d9c08dd (diff) | |
| download | python-openstackclient-fe650e204bf0ce7ca82f12b56da02dde98eeff15.tar.gz | |
Merge "Implement network rbac create and delete commands"
Diffstat (limited to 'openstackclient')
| -rw-r--r-- | openstackclient/network/v2/network_rbac.py | 132 | ||||
| -rw-r--r-- | openstackclient/tests/network/v2/fakes.py | 19 | ||||
| -rw-r--r-- | openstackclient/tests/network/v2/test_network_rbac.py | 258 |
3 files changed, 406 insertions, 3 deletions
diff --git a/openstackclient/network/v2/network_rbac.py b/openstackclient/network/v2/network_rbac.py index 7a759449..62968376 100644 --- a/openstackclient/network/v2/network_rbac.py +++ b/openstackclient/network/v2/network_rbac.py @@ -13,10 +13,17 @@ """RBAC action implementations""" +import logging + from osc_lib.command import command +from osc_lib import exceptions from osc_lib import utils from openstackclient.i18n import _ +from openstackclient.identity import common as identity_common + + +LOG = logging.getLogger(__name__) def _get_columns(item): @@ -30,6 +37,131 @@ def _get_columns(item): return tuple(sorted(columns)) +def _get_attrs(client_manager, parsed_args): + attrs = {} + attrs['object_type'] = parsed_args.type + attrs['action'] = parsed_args.action + + network_client = client_manager.network + if parsed_args.type == 'network': + object_id = network_client.find_network( + parsed_args.rbac_object, ignore_missing=False).id + if parsed_args.type == 'qos_policy': + # TODO(Huanxuan Ao): Support finding a object ID by obejct name + # after qos policy finding supported in SDK. + object_id = parsed_args.rbac_object + attrs['object_id'] = object_id + + identity_client = client_manager.identity + project_id = identity_common.find_project( + identity_client, + parsed_args.target_project, + parsed_args.target_project_domain, + ).id + attrs['target_tenant'] = project_id + if parsed_args.project is not None: + project_id = identity_common.find_project( + identity_client, + parsed_args.project, + parsed_args.project_domain, + ).id + attrs['tenant_id'] = project_id + + return attrs + + +class CreateNetworkRBAC(command.ShowOne): + """Create network RBAC policy""" + + def get_parser(self, prog_name): + parser = super(CreateNetworkRBAC, self).get_parser(prog_name) + parser.add_argument( + 'rbac_object', + metavar="<rbac-object>", + help=_("The object to which this RBAC policy affects (name or " + "ID for network objects, ID only for QoS policy objects)") + ) + parser.add_argument( + '--type', + metavar="<type>", + required=True, + choices=['qos_policy', 'network'], + help=_('Type of the object that RBAC policy ' + 'affects ("qos_policy" or "network")') + ) + parser.add_argument( + '--action', + metavar="<action>", + required=True, + choices=['access_as_external', 'access_as_shared'], + help=_('Action for the RBAC policy ' + '("access_as_external" or "access_as_shared")') + ) + parser.add_argument( + '--target-project', + required=True, + metavar="<target-project>", + help=_('The project to which the RBAC policy ' + 'will be enforced (name or ID)') + ) + parser.add_argument( + '--target-project-domain', + metavar='<target-project-domain>', + help=_('Domain the target project belongs to (name or ID). ' + 'This can be used in case collisions between project names ' + 'exist.'), + ) + parser.add_argument( + '--project', + metavar="<project>", + help=_('The owner project (name or ID)') + ) + identity_common.add_project_domain_option_to_parser(parser) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.network + attrs = _get_attrs(self.app.client_manager, parsed_args) + obj = client.create_rbac_policy(**attrs) + columns = _get_columns(obj) + data = utils.get_item_properties(obj, columns) + return columns, data + + +class DeleteNetworkRBAC(command.Command): + """Delete network RBAC policy(s)""" + + def get_parser(self, prog_name): + parser = super(DeleteNetworkRBAC, self).get_parser(prog_name) + parser.add_argument( + 'rbac_policy', + metavar="<rbac-policy>", + nargs='+', + help=_("RBAC policy(s) to delete (ID only)") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.network + result = 0 + + for rbac in parsed_args.rbac_policy: + try: + obj = client.find_rbac_policy(rbac, ignore_missing=False) + client.delete_rbac_policy(obj) + except Exception as e: + result += 1 + LOG.error(_("Failed to delete RBAC policy with " + "ID '%(rbac)s': %(e)s"), + {'rbac': rbac, 'e': e}) + + if result > 0: + total = len(parsed_args.rbac_policy) + msg = (_("%(result)s of %(total)s RBAC policies failed " + "to delete.") % {'result': result, 'total': total}) + raise exceptions.CommandError(msg) + + class ListNetworkRBAC(command.Lister): """List network RBAC policies""" diff --git a/openstackclient/tests/network/v2/fakes.py b/openstackclient/tests/network/v2/fakes.py index 05752094..9182fe55 100644 --- a/openstackclient/tests/network/v2/fakes.py +++ b/openstackclient/tests/network/v2/fakes.py @@ -541,6 +541,25 @@ class FakeNetworkRBAC(object): return rbac_policies + @staticmethod + def get_network_rbacs(rbac_policies=None, count=2): + """Get an iterable MagicMock object with a list of faked rbac policies. + + If rbac policies list is provided, then initialize the Mock object + with the list. Otherwise create one. + + :param List rbac_policies: + A list of FakeResource objects faking rbac policies + :param int count: + The number of rbac policies to fake + :return: + An iterable Mock object with side_effect set to a list of faked + rbac policies + """ + if rbac_policies is None: + rbac_policies = FakeNetworkRBAC.create_network_rbacs(count) + return mock.MagicMock(side_effect=rbac_policies) + class FakeRouter(object): """Fake one or more routers.""" diff --git a/openstackclient/tests/network/v2/test_network_rbac.py b/openstackclient/tests/network/v2/test_network_rbac.py index 057e0adc..5d6e9cfa 100644 --- a/openstackclient/tests/network/v2/test_network_rbac.py +++ b/openstackclient/tests/network/v2/test_network_rbac.py @@ -12,8 +12,12 @@ # import mock +from mock import call + +from osc_lib import exceptions from openstackclient.network.v2 import network_rbac +from openstackclient.tests.identity.v3 import fakes as identity_fakes_v3 from openstackclient.tests.network.v2 import fakes as network_fakes from openstackclient.tests import utils as tests_utils @@ -25,6 +29,252 @@ class TestNetworkRBAC(network_fakes.TestNetworkV2): # Get a shortcut to the network client self.network = self.app.client_manager.network + # Get a shortcut to the ProjectManager Mock + self.projects_mock = self.app.client_manager.identity.projects + + +class TestCreateNetworkRBAC(TestNetworkRBAC): + + network_object = network_fakes.FakeNetwork.create_one_network() + project = identity_fakes_v3.FakeProject.create_one_project() + rbac_policy = network_fakes.FakeNetworkRBAC.create_one_network_rbac( + attrs={'tenant_id': project.id, + 'target_tenant': project.id, + 'object_id': network_object.id} + ) + + columns = ( + 'action', + 'id', + 'object_id', + 'object_type', + 'project_id', + 'target_project', + ) + + data = [ + rbac_policy.action, + rbac_policy.id, + rbac_policy.object_id, + rbac_policy.object_type, + rbac_policy.tenant_id, + rbac_policy.target_tenant, + ] + + def setUp(self): + super(TestCreateNetworkRBAC, self).setUp() + + # Get the command object to test + self.cmd = network_rbac.CreateNetworkRBAC(self.app, self.namespace) + + self.network.create_rbac_policy = mock.Mock( + return_value=self.rbac_policy) + self.network.find_network = mock.Mock( + return_value=self.network_object) + self.projects_mock.get.return_value = self.project + + def test_network_rbac_create_no_type(self): + arglist = [ + '--action', self.rbac_policy.action, + self.rbac_policy.object_id, + ] + verifylist = [ + ('action', self.rbac_policy.action), + ('rbac_policy', self.rbac_policy.id), + ] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_network_rbac_create_no_action(self): + arglist = [ + '--type', self.rbac_policy.object_type, + self.rbac_policy.object_id, + ] + verifylist = [ + ('type', self.rbac_policy.object_type), + ('rbac_policy', self.rbac_policy.id), + ] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_network_rbac_create_invalid_type(self): + arglist = [ + '--action', self.rbac_policy.action, + '--type', 'invalid_type', + '--target-project', self.rbac_policy.target_tenant, + self.rbac_policy.object_id, + ] + verifylist = [ + ('action', self.rbac_policy.action), + ('type', 'invalid_type'), + ('target-project', self.rbac_policy.target_tenant), + ('rbac_policy', self.rbac_policy.id), + ] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_network_rbac_create_invalid_action(self): + arglist = [ + '--type', self.rbac_policy.object_type, + '--action', 'invalid_action', + '--target-project', self.rbac_policy.target_tenant, + self.rbac_policy.object_id, + ] + verifylist = [ + ('type', self.rbac_policy.object_type), + ('action', 'invalid_action'), + ('target-project', self.rbac_policy.target_tenant), + ('rbac_policy', self.rbac_policy.id), + ] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_network_rbac_create(self): + arglist = [ + '--type', self.rbac_policy.object_type, + '--action', self.rbac_policy.action, + '--target-project', self.rbac_policy.target_tenant, + self.rbac_policy.object_id, + ] + verifylist = [ + ('type', self.rbac_policy.object_type), + ('action', self.rbac_policy.action), + ('target_project', self.rbac_policy.target_tenant), + ('rbac_object', self.rbac_policy.object_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # DisplayCommandBase.take_action() returns two tuples + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_rbac_policy.assert_called_with(**{ + 'object_id': self.rbac_policy.object_id, + 'object_type': self.rbac_policy.object_type, + 'action': self.rbac_policy.action, + 'target_tenant': self.rbac_policy.target_tenant, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + def test_network_rbac_create_all_options(self): + arglist = [ + '--type', self.rbac_policy.object_type, + '--action', self.rbac_policy.action, + '--target-project', self.rbac_policy.target_tenant, + '--project', self.rbac_policy.tenant_id, + '--project-domain', self.project.domain_id, + '--target-project-domain', self.project.domain_id, + self.rbac_policy.object_id, + ] + verifylist = [ + ('type', self.rbac_policy.object_type), + ('action', self.rbac_policy.action), + ('target_project', self.rbac_policy.target_tenant), + ('project', self.rbac_policy.tenant_id), + ('project_domain', self.project.domain_id), + ('target_project_domain', self.project.domain_id), + ('rbac_object', self.rbac_policy.object_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # DisplayCommandBase.take_action() returns two tuples + columns, data = self.cmd.take_action(parsed_args) + + self.network.create_rbac_policy.assert_called_with(**{ + 'object_id': self.rbac_policy.object_id, + 'object_type': self.rbac_policy.object_type, + 'action': self.rbac_policy.action, + 'target_tenant': self.rbac_policy.target_tenant, + 'tenant_id': self.rbac_policy.tenant_id, + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestDeleteNetworkRBAC(TestNetworkRBAC): + + rbac_policies = network_fakes.FakeNetworkRBAC.create_network_rbacs(count=2) + + def setUp(self): + super(TestDeleteNetworkRBAC, self).setUp() + self.network.delete_rbac_policy = mock.Mock(return_value=None) + self.network.find_rbac_policy = ( + network_fakes.FakeNetworkRBAC.get_network_rbacs( + rbac_policies=self.rbac_policies) + ) + + # Get the command object to test + self.cmd = network_rbac.DeleteNetworkRBAC(self.app, self.namespace) + + def test_network_rbac_delete(self): + arglist = [ + self.rbac_policies[0].id, + ] + verifylist = [ + ('rbac_policy', [self.rbac_policies[0].id]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.network.find_rbac_policy.assert_called_once_with( + self.rbac_policies[0].id, ignore_missing=False) + self.network.delete_rbac_policy.assert_called_once_with( + self.rbac_policies[0]) + self.assertIsNone(result) + + def test_multi_network_rbacs_delete(self): + arglist = [] + verifylist = [] + + for r in self.rbac_policies: + arglist.append(r.id) + verifylist = [ + ('rbac_policy', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for r in self.rbac_policies: + calls.append(call(r)) + self.network.delete_rbac_policy.assert_has_calls(calls) + self.assertIsNone(result) + + def test_multi_network_policies_delete_with_exception(self): + arglist = [ + self.rbac_policies[0].id, + 'unexist_rbac_policy', + ] + verifylist = [ + ('rbac_policy', + [self.rbac_policies[0].id, 'unexist_rbac_policy']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [self.rbac_policies[0], exceptions.CommandError] + self.network.find_rbac_policy = ( + mock.MagicMock(side_effect=find_mock_result) + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 RBAC policies failed to delete.', str(e)) + + self.network.find_rbac_policy.assert_any_call( + self.rbac_policies[0].id, ignore_missing=False) + self.network.find_rbac_policy.assert_any_call( + 'unexist_rbac_policy', ignore_missing=False) + self.network.delete_rbac_policy.assert_called_once_with( + self.rbac_policies[0] + ) class TestListNetworkRABC(TestNetworkRBAC): @@ -107,16 +357,18 @@ class TestShowNetworkRBAC(TestNetworkRBAC): def test_network_rbac_show_all_options(self): arglist = [ - self.rbac_policy.object_id, + self.rbac_policy.id, + ] + verifylist = [ + ('rbac_policy', self.rbac_policy.id), ] - verifylist = [] parsed_args = self.check_parser(self.cmd, arglist, verifylist) # DisplayCommandBase.take_action() returns two tuples columns, data = self.cmd.take_action(parsed_args) self.network.find_rbac_policy.assert_called_with( - self.rbac_policy.object_id, ignore_missing=False + self.rbac_policy.id, ignore_missing=False ) self.assertEqual(self.columns, columns) self.assertEqual(self.data, list(data)) |
