summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
authorHuanxuan Ao <huanxuan.ao@easystack.cn>2016-06-12 12:50:30 +0800
committerHuanxuan Ao <huanxuan.ao@easystack.cn>2016-06-15 17:23:39 +0800
commit8e2f49fbf22c242270c8162254fc83fbb4580a24 (patch)
tree8885ba7e468cc5ff625a1919f7a95199fb1c0ef0 /openstackclient
parent0ec711c640120539e4968c81fd6ee158257845d4 (diff)
downloadpython-openstackclient-8e2f49fbf22c242270c8162254fc83fbb4580a24.tar.gz
Support bulk deletion for commands that exist in both network and compute.
Some delete commands in networkv2 are exist in both network and compute, They can use NetworkAndComputeDeleteclass to supprot bulk deletion and error handling and the codes are similar, so I change them all in this patch. The changed commands including: 1.floating ip delete 2.security group delete 3.security group rule delete Also, I update unit tests and docs for these commands in this patch. Change-Id: I6c94c3d10ba579ddd9b14d17673c821e3481fd8a Partially-Implements: blueprint multi-argument-network
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/network/v2/floating_ip.py18
-rw-r--r--openstackclient/network/v2/security_group.py18
-rw-r--r--openstackclient/network/v2/security_group_rule.py16
-rw-r--r--openstackclient/tests/compute/v2/fakes.py19
-rw-r--r--openstackclient/tests/network/v2/fakes.py39
-rw-r--r--openstackclient/tests/network/v2/test_floating_ip.py131
-rw-r--r--openstackclient/tests/network/v2/test_security_group.py139
-rw-r--r--openstackclient/tests/network/v2/test_security_group_rule.py127
8 files changed, 444 insertions, 63 deletions
diff --git a/openstackclient/network/v2/floating_ip.py b/openstackclient/network/v2/floating_ip.py
index c734c2ed..8fbf049e 100644
--- a/openstackclient/network/v2/floating_ip.py
+++ b/openstackclient/network/v2/floating_ip.py
@@ -110,26 +110,28 @@ class CreateFloatingIP(common.NetworkAndComputeShowOne):
return (columns, data)
-class DeleteFloatingIP(common.NetworkAndComputeCommand):
- """Delete floating IP"""
+class DeleteFloatingIP(common.NetworkAndComputeDelete):
+ """Delete floating IP(s)"""
+
+ # Used by base class to find resources in parsed_args.
+ resource = 'floating_ip'
+ r = None
def update_parser_common(self, parser):
parser.add_argument(
'floating_ip',
metavar="<floating-ip>",
- help=_("Floating IP to delete (IP address or ID)")
+ nargs="+",
+ help=_("Floating IP(s) to delete (IP address or ID)")
)
return parser
def take_action_network(self, client, parsed_args):
- obj = client.find_ip(parsed_args.floating_ip)
+ obj = client.find_ip(self.r, ignore_missing=False)
client.delete_ip(obj)
def take_action_compute(self, client, parsed_args):
- obj = utils.find_resource(
- client.floating_ips,
- parsed_args.floating_ip,
- )
+ obj = utils.find_resource(client.floating_ips, self.r)
client.floating_ips.delete(obj.id)
diff --git a/openstackclient/network/v2/security_group.py b/openstackclient/network/v2/security_group.py
index 95971800..f832f721 100644
--- a/openstackclient/network/v2/security_group.py
+++ b/openstackclient/network/v2/security_group.py
@@ -164,26 +164,28 @@ class CreateSecurityGroup(common.NetworkAndComputeShowOne):
return (display_columns, data)
-class DeleteSecurityGroup(common.NetworkAndComputeCommand):
- """Delete a security group"""
+class DeleteSecurityGroup(common.NetworkAndComputeDelete):
+ """Delete security group(s)"""
+
+ # Used by base class to find resources in parsed_args.
+ resource = 'group'
+ r = None
def update_parser_common(self, parser):
parser.add_argument(
'group',
metavar='<group>',
- help=_("Security group to delete (name or ID)")
+ nargs="+",
+ help=_("Security group(s) to delete (name or ID)"),
)
return parser
def take_action_network(self, client, parsed_args):
- obj = client.find_security_group(parsed_args.group)
+ obj = client.find_security_group(self.r, ignore_missing=False)
client.delete_security_group(obj)
def take_action_compute(self, client, parsed_args):
- data = utils.find_resource(
- client.security_groups,
- parsed_args.group,
- )
+ data = utils.find_resource(client.security_groups, self.r)
client.security_groups.delete(data.id)
diff --git a/openstackclient/network/v2/security_group_rule.py b/openstackclient/network/v2/security_group_rule.py
index 7c810786..18863223 100644
--- a/openstackclient/network/v2/security_group_rule.py
+++ b/openstackclient/network/v2/security_group_rule.py
@@ -333,23 +333,29 @@ class CreateSecurityGroupRule(common.NetworkAndComputeShowOne):
return _format_security_group_rule_show(obj._info)
-class DeleteSecurityGroupRule(common.NetworkAndComputeCommand):
- """Delete a security group rule"""
+class DeleteSecurityGroupRule(common.NetworkAndComputeDelete):
+ """Delete security group rule(s)"""
+
+ # Used by base class to find resources in parsed_args.
+ resource = 'rule'
+ r = None
def update_parser_common(self, parser):
parser.add_argument(
'rule',
metavar='<rule>',
- help=_("Security group rule to delete (ID only)")
+ nargs="+",
+ help=_("Security group rule(s) to delete (ID only)")
)
return parser
def take_action_network(self, client, parsed_args):
- obj = client.find_security_group_rule(parsed_args.rule)
+ obj = client.find_security_group_rule(
+ self.r, ignore_missing=False)
client.delete_security_group_rule(obj)
def take_action_compute(self, client, parsed_args):
- client.security_group_rules.delete(parsed_args.rule)
+ client.security_group_rules.delete(self.r)
class ListSecurityGroupRule(common.NetworkAndComputeLister):
diff --git a/openstackclient/tests/compute/v2/fakes.py b/openstackclient/tests/compute/v2/fakes.py
index 9682eec4..60abb8ef 100644
--- a/openstackclient/tests/compute/v2/fakes.py
+++ b/openstackclient/tests/compute/v2/fakes.py
@@ -452,6 +452,25 @@ class FakeSecurityGroup(object):
return security_groups
+ @staticmethod
+ def get_security_groups(security_groups=None, count=2):
+ """Get an iterable MagicMock object with a list of faked security groups.
+
+ If security groups list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
+
+ :param List security groups:
+ A list of FakeResource objects faking security groups
+ :param int count:
+ The number of security groups to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ security groups
+ """
+ if security_groups is None:
+ security_groups = FakeSecurityGroup.create_security_groups(count)
+ return mock.MagicMock(side_effect=security_groups)
+
class FakeSecurityGroupRule(object):
"""Fake one or more security group rules."""
diff --git a/openstackclient/tests/network/v2/fakes.py b/openstackclient/tests/network/v2/fakes.py
index 9efbe8c6..6b09e297 100644
--- a/openstackclient/tests/network/v2/fakes.py
+++ b/openstackclient/tests/network/v2/fakes.py
@@ -611,6 +611,25 @@ class FakeSecurityGroup(object):
return security_groups
+ @staticmethod
+ def get_security_groups(security_groups=None, count=2):
+ """Get an iterable MagicMock object with a list of faked security groups.
+
+ If security groups list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
+
+ :param List security groups:
+ A list of FakeResource objects faking security groups
+ :param int count:
+ The number of security groups to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ security groups
+ """
+ if security_groups is None:
+ security_groups = FakeSecurityGroup.create_security_groups(count)
+ return mock.MagicMock(side_effect=security_groups)
+
class FakeSecurityGroupRule(object):
"""Fake one or more security group rules."""
@@ -670,6 +689,26 @@ class FakeSecurityGroupRule(object):
return security_group_rules
+ @staticmethod
+ def get_security_group_rules(security_group_rules=None, count=2):
+ """Get an iterable MagicMock object with a list of faked security group rules.
+
+ If security group rules list is provided, then initialize the Mock
+ object with the list. Otherwise create one.
+
+ :param List security group rules:
+ A list of FakeResource objects faking security group rules
+ :param int count:
+ The number of security group rules to fake
+ :return:
+ An iterable Mock object with side_effect set to a list of faked
+ security group rules
+ """
+ if security_group_rules is None:
+ security_group_rules = (
+ FakeSecurityGroupRule.create_security_group_rules(count))
+ return mock.MagicMock(side_effect=security_group_rules)
+
class FakeSubnet(object):
"""Fake one or more subnets."""
diff --git a/openstackclient/tests/network/v2/test_floating_ip.py b/openstackclient/tests/network/v2/test_floating_ip.py
index f9ccfe1c..5cd5279a 100644
--- a/openstackclient/tests/network/v2/test_floating_ip.py
+++ b/openstackclient/tests/network/v2/test_floating_ip.py
@@ -12,6 +12,9 @@
#
import mock
+from mock import call
+
+from osc_lib import exceptions
from openstackclient.network.v2 import floating_ip
from openstackclient.tests.compute.v2 import fakes as compute_fakes
@@ -140,33 +143,84 @@ class TestCreateFloatingIPNetwork(TestFloatingIPNetwork):
class TestDeleteFloatingIPNetwork(TestFloatingIPNetwork):
- # The floating ip to be deleted.
- floating_ip = network_fakes.FakeFloatingIP.create_one_floating_ip()
+ # The floating ips to be deleted.
+ floating_ips = network_fakes.FakeFloatingIP.create_floating_ips(count=2)
def setUp(self):
super(TestDeleteFloatingIPNetwork, self).setUp()
self.network.delete_ip = mock.Mock(return_value=None)
- self.network.find_ip = mock.Mock(return_value=self.floating_ip)
+ self.network.find_ip = (
+ network_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips))
# Get the command object to test
self.cmd = floating_ip.DeleteFloatingIP(self.app, self.namespace)
def test_floating_ip_delete(self):
arglist = [
- self.floating_ip.id,
+ self.floating_ips[0].id,
]
verifylist = [
- ('floating_ip', self.floating_ip.id),
+ ('floating_ip', [self.floating_ips[0].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
- self.network.find_ip.assert_called_once_with(self.floating_ip.id)
- self.network.delete_ip.assert_called_once_with(self.floating_ip)
+ self.network.find_ip.assert_called_once_with(
+ self.floating_ips[0].id, ignore_missing=False)
+ self.network.delete_ip.assert_called_once_with(self.floating_ips[0])
self.assertIsNone(result)
+ def test_multi_floating_ips_delete(self):
+ arglist = []
+ verifylist = []
+
+ for f in self.floating_ips:
+ arglist.append(f.id)
+ verifylist = [
+ ('floating_ip', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for f in self.floating_ips:
+ calls.append(call(f))
+ self.network.delete_ip.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_floating_ips_delete_with_exception(self):
+ arglist = [
+ self.floating_ips[0].id,
+ 'unexist_floating_ip',
+ ]
+ verifylist = [
+ ('floating_ip',
+ [self.floating_ips[0].id, 'unexist_floating_ip']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self.floating_ips[0], exceptions.CommandError]
+ self.network.find_ip = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 floating_ip failed to delete.', str(e))
+
+ self.network.find_ip.assert_any_call(
+ self.floating_ips[0].id, ignore_missing=False)
+ self.network.find_ip.assert_any_call(
+ 'unexist_floating_ip', ignore_missing=False)
+ self.network.delete_ip.assert_called_once_with(
+ self.floating_ips[0]
+ )
+
class TestListFloatingIPNetwork(TestFloatingIPNetwork):
@@ -335,8 +389,8 @@ class TestCreateFloatingIPCompute(TestFloatingIPCompute):
class TestDeleteFloatingIPCompute(TestFloatingIPCompute):
- # The floating ip to be deleted.
- floating_ip = compute_fakes.FakeFloatingIP.create_one_floating_ip()
+ # The floating ips to be deleted.
+ floating_ips = compute_fakes.FakeFloatingIP.create_floating_ips(count=2)
def setUp(self):
super(TestDeleteFloatingIPCompute, self).setUp()
@@ -346,27 +400,78 @@ class TestDeleteFloatingIPCompute(TestFloatingIPCompute):
self.compute.floating_ips.delete.return_value = None
# Return value of utils.find_resource()
- self.compute.floating_ips.get.return_value = self.floating_ip
+ self.compute.floating_ips.get = (
+ compute_fakes.FakeFloatingIP.get_floating_ips(self.floating_ips))
# Get the command object to test
self.cmd = floating_ip.DeleteFloatingIP(self.app, None)
def test_floating_ip_delete(self):
arglist = [
- self.floating_ip.id,
+ self.floating_ips[0].id,
]
verifylist = [
- ('floating_ip', self.floating_ip.id),
+ ('floating_ip', [self.floating_ips[0].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.compute.floating_ips.delete.assert_called_once_with(
- self.floating_ip.id
+ self.floating_ips[0].id
)
self.assertIsNone(result)
+ def test_multi_floating_ips_delete(self):
+ arglist = []
+ verifylist = []
+
+ for f in self.floating_ips:
+ arglist.append(f.id)
+ verifylist = [
+ ('floating_ip', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for f in self.floating_ips:
+ calls.append(call(f.id))
+ self.compute.floating_ips.delete.assert_has_calls(calls)
+ self.assertIsNone(result)
+
+ def test_multi_floating_ips_delete_with_exception(self):
+ arglist = [
+ self.floating_ips[0].id,
+ 'unexist_floating_ip',
+ ]
+ verifylist = [
+ ('floating_ip',
+ [self.floating_ips[0].id, 'unexist_floating_ip']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self.floating_ips[0], exceptions.CommandError]
+ self.compute.floating_ips.get = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+ self.compute.floating_ips.find.side_effect = exceptions.NotFound(None)
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 floating_ip failed to delete.', str(e))
+
+ self.compute.floating_ips.get.assert_any_call(
+ self.floating_ips[0].id)
+ self.compute.floating_ips.get.assert_any_call(
+ 'unexist_floating_ip')
+ self.compute.floating_ips.delete.assert_called_once_with(
+ self.floating_ips[0].id
+ )
+
class TestListFloatingIPCompute(TestFloatingIPCompute):
diff --git a/openstackclient/tests/network/v2/test_security_group.py b/openstackclient/tests/network/v2/test_security_group.py
index 213367a4..b0c14985 100644
--- a/openstackclient/tests/network/v2/test_security_group.py
+++ b/openstackclient/tests/network/v2/test_security_group.py
@@ -13,6 +13,9 @@
import copy
import mock
+from mock import call
+
+from osc_lib import exceptions
from openstackclient.network.v2 import security_group
from openstackclient.tests.compute.v2 import fakes as compute_fakes
@@ -227,42 +230,93 @@ class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
class TestDeleteSecurityGroupNetwork(TestSecurityGroupNetwork):
- # The security group to be deleted.
- _security_group = \
- network_fakes.FakeSecurityGroup.create_one_security_group()
+ # The security groups to be deleted.
+ _security_groups = \
+ network_fakes.FakeSecurityGroup.create_security_groups()
def setUp(self):
super(TestDeleteSecurityGroupNetwork, self).setUp()
self.network.delete_security_group = mock.Mock(return_value=None)
- self.network.find_security_group = mock.Mock(
- return_value=self._security_group)
+ self.network.find_security_group = (
+ network_fakes.FakeSecurityGroup.get_security_groups(
+ self._security_groups)
+ )
# Get the command object to test
self.cmd = security_group.DeleteSecurityGroup(self.app, self.namespace)
def test_security_group_delete(self):
arglist = [
- self._security_group.name,
+ self._security_groups[0].name,
]
verifylist = [
- ('group', self._security_group.name),
+ ('group', [self._security_groups[0].name]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.delete_security_group.assert_called_once_with(
- self._security_group)
+ self._security_groups[0])
+ self.assertIsNone(result)
+
+ def test_multi_security_groups_delete(self):
+ arglist = []
+ verifylist = []
+
+ for s in self._security_groups:
+ arglist.append(s.name)
+ verifylist = [
+ ('group', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for s in self._security_groups:
+ calls.append(call(s))
+ self.network.delete_security_group.assert_has_calls(calls)
self.assertIsNone(result)
+ def test_multi_security_groups_delete_with_exception(self):
+ arglist = [
+ self._security_groups[0].name,
+ 'unexist_security_group',
+ ]
+ verifylist = [
+ ('group',
+ [self._security_groups[0].name, 'unexist_security_group']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self._security_groups[0], exceptions.CommandError]
+ self.network.find_security_group = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 group failed to delete.', str(e))
+
+ self.network.find_security_group.assert_any_call(
+ self._security_groups[0].name, ignore_missing=False)
+ self.network.find_security_group.assert_any_call(
+ 'unexist_security_group', ignore_missing=False)
+ self.network.delete_security_group.assert_called_once_with(
+ self._security_groups[0]
+ )
+
class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
- # The security group to be deleted.
- _security_group = \
- compute_fakes.FakeSecurityGroup.create_one_security_group()
+ # The security groups to be deleted.
+ _security_groups = \
+ compute_fakes.FakeSecurityGroup.create_security_groups()
def setUp(self):
super(TestDeleteSecurityGroupCompute, self).setUp()
@@ -271,27 +325,80 @@ class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
self.compute.security_groups.delete = mock.Mock(return_value=None)
- self.compute.security_groups.get = mock.Mock(
- return_value=self._security_group)
+ self.compute.security_groups.get = (
+ compute_fakes.FakeSecurityGroup.get_security_groups(
+ self._security_groups)
+ )
# Get the command object to test
self.cmd = security_group.DeleteSecurityGroup(self.app, None)
def test_security_group_delete(self):
arglist = [
- self._security_group.name,
+ self._security_groups[0].id,
]
verifylist = [
- ('group', self._security_group.name),
+ ('group', [self._security_groups[0].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.compute.security_groups.delete.assert_called_once_with(
- self._security_group.id)
+ self._security_groups[0].id)
+ self.assertIsNone(result)
+
+ def test_multi_security_groups_delete(self):
+ arglist = []
+ verifylist = []
+
+ for s in self._security_groups:
+ arglist.append(s.id)
+ verifylist = [
+ ('group', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for s in self._security_groups:
+ calls.append(call(s.id))
+ self.compute.security_groups.delete.assert_has_calls(calls)
self.assertIsNone(result)
+ def test_multi_security_groups_delete_with_exception(self):
+ arglist = [
+ self._security_groups[0].id,
+ 'unexist_security_group',
+ ]
+ verifylist = [
+ ('group',
+ [self._security_groups[0].id, 'unexist_security_group']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [self._security_groups[0], exceptions.CommandError]
+ self.compute.security_groups.get = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+ self.compute.security_groups.find.side_effect = (
+ exceptions.NotFound(None))
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 group failed to delete.', str(e))
+
+ self.compute.security_groups.get.assert_any_call(
+ self._security_groups[0].id)
+ self.compute.security_groups.get.assert_any_call(
+ 'unexist_security_group')
+ self.compute.security_groups.delete.assert_called_once_with(
+ self._security_groups[0].id
+ )
+
class TestListSecurityGroupNetwork(TestSecurityGroupNetwork):
diff --git a/openstackclient/tests/network/v2/test_security_group_rule.py b/openstackclient/tests/network/v2/test_security_group_rule.py
index b1f2209d..b2862679 100644
--- a/openstackclient/tests/network/v2/test_security_group_rule.py
+++ b/openstackclient/tests/network/v2/test_security_group_rule.py
@@ -13,6 +13,7 @@
import copy
import mock
+from mock import call
from osc_lib import exceptions
@@ -668,17 +669,20 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
- # The security group rule to be deleted.
- _security_group_rule = \
- network_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
+ # The security group rules to be deleted.
+ _security_group_rules = \
+ network_fakes.FakeSecurityGroupRule.create_security_group_rules(
+ count=2)
def setUp(self):
super(TestDeleteSecurityGroupRuleNetwork, self).setUp()
self.network.delete_security_group_rule = mock.Mock(return_value=None)
- self.network.find_security_group_rule = mock.Mock(
- return_value=self._security_group_rule)
+ self.network.find_security_group_rule = (
+ network_fakes.FakeSecurityGroupRule.get_security_group_rules(
+ self._security_group_rules)
+ )
# Get the command object to test
self.cmd = security_group_rule.DeleteSecurityGroupRule(
@@ -686,25 +690,76 @@ class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
def test_security_group_rule_delete(self):
arglist = [
- self._security_group_rule.id,
+ self._security_group_rules[0].id,
]
verifylist = [
- ('rule', self._security_group_rule.id),
+ ('rule', [self._security_group_rules[0].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.delete_security_group_rule.assert_called_once_with(
- self._security_group_rule)
+ self._security_group_rules[0])
+ self.assertIsNone(result)
+
+ def test_multi_security_group_rules_delete(self):
+ arglist = []
+ verifylist = []
+
+ for s in self._security_group_rules:
+ arglist.append(s.id)
+ verifylist = [
+ ('rule', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for s in self._security_group_rules:
+ calls.append(call(s))
+ self.network.delete_security_group_rule.assert_has_calls(calls)
self.assertIsNone(result)
+ def test_multi_security_group_rules_delete_with_exception(self):
+ arglist = [
+ self._security_group_rules[0].id,
+ 'unexist_rule',
+ ]
+ verifylist = [
+ ('rule',
+ [self._security_group_rules[0].id, 'unexist_rule']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [
+ self._security_group_rules[0], exceptions.CommandError]
+ self.network.find_security_group_rule = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 rule failed to delete.', str(e))
+
+ self.network.find_security_group_rule.assert_any_call(
+ self._security_group_rules[0].id, ignore_missing=False)
+ self.network.find_security_group_rule.assert_any_call(
+ 'unexist_rule', ignore_missing=False)
+ self.network.delete_security_group_rule.assert_called_once_with(
+ self._security_group_rules[0]
+ )
+
class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
# The security group rule to be deleted.
- _security_group_rule = \
- compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
+ _security_group_rules = \
+ compute_fakes.FakeSecurityGroupRule.create_security_group_rules(
+ count=2)
def setUp(self):
super(TestDeleteSecurityGroupRuleCompute, self).setUp()
@@ -716,19 +771,65 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
def test_security_group_rule_delete(self):
arglist = [
- self._security_group_rule.id,
+ self._security_group_rules[0].id,
]
verifylist = [
- ('rule', self._security_group_rule.id),
+ ('rule', [self._security_group_rules[0].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.compute.security_group_rules.delete.assert_called_once_with(
- self._security_group_rule.id)
+ self._security_group_rules[0].id)
+ self.assertIsNone(result)
+
+ def test_multi_security_group_rules_delete(self):
+ arglist = []
+ verifylist = []
+
+ for s in self._security_group_rules:
+ arglist.append(s.id)
+ verifylist = [
+ ('rule', arglist),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+
+ calls = []
+ for s in self._security_group_rules:
+ calls.append(call(s.id))
+ self.compute.security_group_rules.delete.assert_has_calls(calls)
self.assertIsNone(result)
+ def test_multi_security_group_rules_delete_with_exception(self):
+ arglist = [
+ self._security_group_rules[0].id,
+ 'unexist_rule',
+ ]
+ verifylist = [
+ ('rule',
+ [self._security_group_rules[0].id, 'unexist_rule']),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ find_mock_result = [None, exceptions.CommandError]
+ self.compute.security_group_rules.delete = (
+ mock.MagicMock(side_effect=find_mock_result)
+ )
+
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual('1 of 2 rule failed to delete.', str(e))
+
+ self.compute.security_group_rules.delete.assert_any_call(
+ self._security_group_rules[0].id)
+ self.compute.security_group_rules.delete.assert_any_call(
+ 'unexist_rule')
+
class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):