summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-03-28 09:16:23 +0000
committerGerrit Code Review <review@openstack.org>2016-03-28 09:16:23 +0000
commitd5596862b1bbd9fa0bcd1aeb8befad7fab4e3d2a (patch)
treed393d598b862e9ff422e4ac825f796286889a126 /openstackclient
parentef1faf77925f70eb9feeb7bb416f4c9257a16bee (diff)
parentd90650796217fbb9cdd19297ee6ff59f0e009413 (diff)
downloadpython-openstackclient-d5596862b1bbd9fa0bcd1aeb8befad7fab4e3d2a.tar.gz
Merge "Refactor security group rule create to use SDK"
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/compute/v2/security_group.py65
-rw-r--r--openstackclient/network/v2/security_group_rule.py100
-rw-r--r--openstackclient/tests/compute/v2/fakes.py6
-rw-r--r--openstackclient/tests/compute/v2/test_security_group_rule.py266
-rw-r--r--openstackclient/tests/network/v2/fakes.py10
-rw-r--r--openstackclient/tests/network/v2/test_security_group_rule.py319
6 files changed, 423 insertions, 343 deletions
diff --git a/openstackclient/compute/v2/security_group.py b/openstackclient/compute/v2/security_group.py
index 734bd78e..ca3bf5dc 100644
--- a/openstackclient/compute/v2/security_group.py
+++ b/openstackclient/compute/v2/security_group.py
@@ -16,15 +16,12 @@
"""Compute v2 Security Group action implementations"""
-import six
-
try:
from novaclient.v2 import security_group_rules
except ImportError:
from novaclient.v1_1 import security_group_rules
from openstackclient.common import command
-from openstackclient.common import parseractions
from openstackclient.common import utils
@@ -56,68 +53,6 @@ def _xform_security_group_rule(sgroup):
return info
-class CreateSecurityGroupRule(command.ShowOne):
- """Create a new security group rule"""
-
- def get_parser(self, prog_name):
- parser = super(CreateSecurityGroupRule, self).get_parser(prog_name)
- parser.add_argument(
- 'group',
- metavar='<group>',
- help='Create rule in this security group (name or ID)',
- )
- parser.add_argument(
- "--proto",
- metavar="<proto>",
- default="tcp",
- help="IP protocol (icmp, tcp, udp; default: tcp)",
- )
- source_group = parser.add_mutually_exclusive_group()
- source_group.add_argument(
- "--src-ip",
- metavar="<ip-address>",
- default="0.0.0.0/0",
- help="Source IP address block (may use CIDR notation; default: "
- "0.0.0.0/0)",
- )
- source_group.add_argument(
- "--src-group",
- metavar="<group>",
- help="Source security group (ID only)",
- )
- parser.add_argument(
- "--dst-port",
- metavar="<port-range>",
- default=(0, 0),
- action=parseractions.RangeAction,
- help="Destination port, may be a range: 137:139 (default: 0; "
- "only required for proto tcp and udp)",
- )
- return parser
-
- def take_action(self, parsed_args):
- compute_client = self.app.client_manager.compute
- group = utils.find_resource(
- compute_client.security_groups,
- parsed_args.group,
- )
- if parsed_args.proto.lower() == 'icmp':
- from_port, to_port = -1, -1
- else:
- from_port, to_port = parsed_args.dst_port
- data = compute_client.security_group_rules.create(
- group.id,
- parsed_args.proto,
- from_port,
- to_port,
- parsed_args.src_ip,
- parsed_args.src_group,
- )
-
- info = _xform_security_group_rule(data._info)
- return zip(*sorted(six.iteritems(info)))
-
-
class ListSecurityGroupRule(command.Lister):
"""List security group rules"""
diff --git a/openstackclient/network/v2/security_group_rule.py b/openstackclient/network/v2/security_group_rule.py
index 9309b326..e0244654 100644
--- a/openstackclient/network/v2/security_group_rule.py
+++ b/openstackclient/network/v2/security_group_rule.py
@@ -16,6 +16,7 @@
import six
from openstackclient.common import exceptions
+from openstackclient.common import parseractions
from openstackclient.common import utils
from openstackclient.network import common
from openstackclient.network import utils as network_utils
@@ -34,6 +35,105 @@ def _get_columns(item):
return tuple(sorted(columns))
+def _convert_to_lowercase(string):
+ return string.lower()
+
+
+class CreateSecurityGroupRule(common.NetworkAndComputeShowOne):
+ """Create a new security group rule"""
+
+ def update_parser_common(self, parser):
+ parser.add_argument(
+ 'group',
+ metavar='<group>',
+ help='Create rule in this security group (name or ID)',
+ )
+ # TODO(rtheis): Add support for additional protocols for network.
+ # Until then, continue enforcing the compute choices.
+ parser.add_argument(
+ "--proto",
+ metavar="<proto>",
+ default="tcp",
+ choices=['icmp', 'tcp', 'udp'],
+ type=_convert_to_lowercase,
+ help="IP protocol (icmp, tcp, udp; default: tcp)",
+ )
+ source_group = parser.add_mutually_exclusive_group()
+ source_group.add_argument(
+ "--src-ip",
+ metavar="<ip-address>",
+ default="0.0.0.0/0",
+ help="Source IP address block (may use CIDR notation; default: "
+ "0.0.0.0/0)",
+ )
+ source_group.add_argument(
+ "--src-group",
+ metavar="<group>",
+ help="Source security group (ID only)",
+ )
+ parser.add_argument(
+ "--dst-port",
+ metavar="<port-range>",
+ default=(0, 0),
+ action=parseractions.RangeAction,
+ help="Destination port, may be a single port or port range: "
+ "137:139 (only required for IP protocols tcp and udp)",
+ )
+ return parser
+
+ def take_action_network(self, client, parsed_args):
+ # Get the security group ID to hold the rule.
+ security_group_id = client.find_security_group(
+ parsed_args.group,
+ ignore_missing=False
+ ).id
+
+ # Build the create attributes.
+ attrs = {}
+ # TODO(rtheis): Add --direction option. Until then, continue
+ # with the default of 'ingress'.
+ attrs['direction'] = 'ingress'
+ # TODO(rtheis): Add --ethertype option. Until then, continue
+ # with the default of 'IPv4'
+ attrs['ethertype'] = 'IPv4'
+ # TODO(rtheis): Add port range support (type and code) for icmp
+ # protocol. Until then, continue ignoring the port range.
+ if parsed_args.proto != 'icmp':
+ attrs['port_range_min'] = parsed_args.dst_port[0]
+ attrs['port_range_max'] = parsed_args.dst_port[1]
+ attrs['protocol'] = parsed_args.proto
+ if parsed_args.src_group is not None:
+ attrs['remote_group_id'] = parsed_args.src_group
+ else:
+ attrs['remote_ip_prefix'] = parsed_args.src_ip
+ attrs['security_group_id'] = security_group_id
+
+ # Create and show the security group rule.
+ obj = client.create_security_group_rule(**attrs)
+ columns = _get_columns(obj)
+ data = utils.get_item_properties(obj, columns)
+ return (columns, data)
+
+ def take_action_compute(self, client, parsed_args):
+ group = utils.find_resource(
+ client.security_groups,
+ parsed_args.group,
+ )
+ if parsed_args.proto == 'icmp':
+ from_port, to_port = -1, -1
+ else:
+ from_port, to_port = parsed_args.dst_port
+ obj = client.security_group_rules.create(
+ group.id,
+ parsed_args.proto,
+ from_port,
+ to_port,
+ parsed_args.src_ip,
+ parsed_args.src_group,
+ )
+ return _format_security_group_rule_show(obj._info)
+
+
class DeleteSecurityGroupRule(common.NetworkAndComputeCommand):
"""Delete a security group rule"""
diff --git a/openstackclient/tests/compute/v2/fakes.py b/openstackclient/tests/compute/v2/fakes.py
index 809825d0..fe9b3c75 100644
--- a/openstackclient/tests/compute/v2/fakes.py
+++ b/openstackclient/tests/compute/v2/fakes.py
@@ -393,13 +393,13 @@ class FakeSecurityGroupRule(object):
# Set default attributes.
security_group_rule_attrs = {
- 'from_port': -1,
+ 'from_port': 0,
'group': {},
'id': 'security-group-rule-id-' + uuid.uuid4().hex,
- 'ip_protocol': 'icmp',
+ 'ip_protocol': 'tcp',
'ip_range': {'cidr': '0.0.0.0/0'},
'parent_group_id': 'security-group-id-' + uuid.uuid4().hex,
- 'to_port': -1,
+ 'to_port': 0,
}
# Overwrite default attributes.
diff --git a/openstackclient/tests/compute/v2/test_security_group_rule.py b/openstackclient/tests/compute/v2/test_security_group_rule.py
index 9a8003f3..42bf2c26 100644
--- a/openstackclient/tests/compute/v2/test_security_group_rule.py
+++ b/openstackclient/tests/compute/v2/test_security_group_rule.py
@@ -17,7 +17,6 @@ from openstackclient.compute.v2 import security_group
from openstackclient.tests.compute.v2 import fakes as compute_fakes
from openstackclient.tests import fakes
from openstackclient.tests.identity.v2_0 import fakes as identity_fakes
-from openstackclient.tests import utils
security_group_id = '11'
@@ -111,271 +110,6 @@ class TestSecurityGroupRule(compute_fakes.TestComputev2):
self.sg_rules_mock.reset_mock()
-class TestSecurityGroupRuleCreate(TestSecurityGroupRule):
-
- columns = (
- 'id',
- 'ip_protocol',
- 'ip_range',
- 'parent_group_id',
- 'port_range',
- 'remote_security_group',
- )
-
- def setUp(self):
- super(TestSecurityGroupRuleCreate, self).setUp()
-
- self.secgroups_mock.get.return_value = FakeSecurityGroupRuleResource(
- None,
- copy.deepcopy(SECURITY_GROUP),
- loaded=True,
- )
-
- # Get the command object to test
- self.cmd = security_group.CreateSecurityGroupRule(self.app, None)
-
- def test_security_group_rule_create_no_options(self):
- self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
- None,
- copy.deepcopy(SECURITY_GROUP_RULE),
- loaded=True,
- )
-
- arglist = [
- security_group_name,
- ]
- verifylist = [
- ('group', security_group_name),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # In base command class ShowOne in cliff, abstract method take_action()
- # returns a two-part tuple with a tuple of column names and a tuple of
- # data to be shown.
- columns, data = self.cmd.take_action(parsed_args)
-
- # SecurityGroupManager.create(name, description)
- self.sg_rules_mock.create.assert_called_with(
- security_group_id,
- 'tcp',
- 0,
- 0,
- security_group_rule_cidr,
- None,
- )
-
- self.assertEqual(self.columns, columns)
- datalist = (
- security_group_rule_id,
- 'tcp',
- security_group_rule_cidr,
- security_group_id,
- '0:0',
- '',
- )
- self.assertEqual(datalist, data)
-
- def test_security_group_rule_create_ftp(self):
- sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
- sg_rule['from_port'] = 20
- sg_rule['to_port'] = 21
- self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
- None,
- sg_rule,
- loaded=True,
- )
-
- arglist = [
- security_group_name,
- '--dst-port', '20:21',
- ]
- verifylist = [
- ('group', security_group_name),
- ('dst_port', (20, 21)),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # In base command class ShowOne in cliff, abstract method take_action()
- # returns a two-part tuple with a tuple of column names and a tuple of
- # data to be shown.
- columns, data = self.cmd.take_action(parsed_args)
-
- # SecurityGroupManager.create(name, description)
- self.sg_rules_mock.create.assert_called_with(
- security_group_id,
- 'tcp',
- 20,
- 21,
- security_group_rule_cidr,
- None,
- )
-
- self.assertEqual(self.columns, columns)
- datalist = (
- security_group_rule_id,
- 'tcp',
- security_group_rule_cidr,
- security_group_id,
- '20:21',
- '',
- )
- self.assertEqual(datalist, data)
-
- def test_security_group_rule_create_ssh(self):
- sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
- sg_rule['from_port'] = 22
- sg_rule['to_port'] = 22
- sg_rule['ip_range'] = {}
- sg_rule['group'] = {'name': security_group_name}
- self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
- None,
- sg_rule,
- loaded=True,
- )
-
- arglist = [
- security_group_name,
- '--dst-port', '22',
- '--src-group', security_group_id,
- ]
- verifylist = [
- ('group', security_group_name),
- ('dst_port', (22, 22)),
- ('src_group', security_group_id),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # In base command class ShowOne in cliff, abstract method take_action()
- # returns a two-part tuple with a tuple of column names and a tuple of
- # data to be shown.
- columns, data = self.cmd.take_action(parsed_args)
-
- # SecurityGroupManager.create(name, description)
- self.sg_rules_mock.create.assert_called_with(
- security_group_id,
- 'tcp',
- 22,
- 22,
- security_group_rule_cidr,
- security_group_id,
- )
-
- self.assertEqual(self.columns, columns)
- datalist = (
- security_group_rule_id,
- 'tcp',
- '',
- security_group_id,
- '22:22',
- security_group_name,
- )
- self.assertEqual(datalist, data)
-
- def test_security_group_rule_create_udp(self):
- sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
- sg_rule['ip_protocol'] = 'udp'
- self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
- None,
- sg_rule,
- loaded=True,
- )
-
- arglist = [
- security_group_name,
- '--proto', 'udp',
- ]
- verifylist = [
- ('group', security_group_name),
- ('proto', 'udp'),
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # In base command class ShowOne in cliff, abstract method take_action()
- # returns a two-part tuple with a tuple of column names and a tuple of
- # data to be shown.
- columns, data = self.cmd.take_action(parsed_args)
-
- # SecurityGroupManager.create(name, description)
- self.sg_rules_mock.create.assert_called_with(
- security_group_id,
- 'udp',
- 0,
- 0,
- security_group_rule_cidr,
- None,
- )
-
- self.assertEqual(self.columns, columns)
- datalist = (
- security_group_rule_id,
- 'udp',
- security_group_rule_cidr,
- security_group_id,
- '0:0',
- '',
- )
- self.assertEqual(datalist, data)
-
- def test_security_group_rule_create_icmp(self):
- sg_rule_cidr = '10.0.2.0/24'
- sg_rule = copy.deepcopy(SECURITY_GROUP_RULE_ICMP)
- sg_rule['ip_range'] = {'cidr': sg_rule_cidr}
- self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
- None,
- sg_rule,
- loaded=True,
- )
-
- arglist = [
- security_group_name,
- '--proto', 'ICMP',
- '--src-ip', sg_rule_cidr,
- ]
- verifylist = [
- ('group', security_group_name),
- ('proto', 'ICMP'),
- ('src_ip', sg_rule_cidr)
- ]
- parsed_args = self.check_parser(self.cmd, arglist, verifylist)
-
- # In base command class ShowOne in cliff, abstract method take_action()
- # returns a two-part tuple with a tuple of column names and a tuple of
- # data to be shown.
- columns, data = self.cmd.take_action(parsed_args)
-
- # SecurityGroupManager.create(name, description)
- self.sg_rules_mock.create.assert_called_with(
- security_group_id,
- 'ICMP',
- -1,
- -1,
- sg_rule_cidr,
- None,
- )
-
- self.assertEqual(self.columns, columns)
- datalist = (
- security_group_rule_id,
- 'icmp',
- sg_rule_cidr,
- security_group_id,
- '',
- '',
- )
- self.assertEqual(datalist, data)
-
- def test_security_group_rule_create_src_invalid(self):
- arglist = [
- security_group_name,
- '--proto', 'ICMP',
- '--src-ip', security_group_rule_cidr,
- '--src-group', security_group_id,
- ]
-
- self.assertRaises(utils.ParserException,
- self.check_parser, self.cmd, arglist, [])
-
-
class TestSecurityGroupRuleList(TestSecurityGroupRule):
def setUp(self):
diff --git a/openstackclient/tests/network/v2/fakes.py b/openstackclient/tests/network/v2/fakes.py
index 71543274..e35fbe16 100644
--- a/openstackclient/tests/network/v2/fakes.py
+++ b/openstackclient/tests/network/v2/fakes.py
@@ -510,11 +510,11 @@ class FakeSecurityGroupRule(object):
'direction': 'ingress',
'ethertype': 'IPv4',
'id': 'security-group-rule-id-' + uuid.uuid4().hex,
- 'port_range_max': None,
- 'port_range_min': None,
- 'protocol': None,
- 'remote_group_id': 'remote-security-group-id-' + uuid.uuid4().hex,
- 'remote_ip_prefix': None,
+ 'port_range_max': 0,
+ 'port_range_min': 0,
+ 'protocol': 'tcp',
+ 'remote_group_id': None,
+ 'remote_ip_prefix': '0.0.0.0/0',
'security_group_id': 'security-group-id-' + uuid.uuid4().hex,
'tenant_id': 'project-id-' + uuid.uuid4().hex,
}
diff --git a/openstackclient/tests/network/v2/test_security_group_rule.py b/openstackclient/tests/network/v2/test_security_group_rule.py
index db15d0e2..a0d6185c 100644
--- a/openstackclient/tests/network/v2/test_security_group_rule.py
+++ b/openstackclient/tests/network/v2/test_security_group_rule.py
@@ -39,6 +39,317 @@ class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2):
self.compute = self.app.client_manager.compute
+class TestCreateSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
+
+ # The security group rule to be created.
+ _security_group_rule = None
+
+ # The security group that will contain the rule created.
+ _security_group = \
+ network_fakes.FakeSecurityGroup.create_one_security_group()
+
+ expected_columns = (
+ 'direction',
+ 'ethertype',
+ 'id',
+ 'port_range_max',
+ 'port_range_min',
+ 'project_id',
+ 'protocol',
+ 'remote_group_id',
+ 'remote_ip_prefix',
+ 'security_group_id',
+ )
+
+ expected_data = None
+
+ def _setup_security_group_rule(self, attrs=None):
+ self._security_group_rule = \
+ network_fakes.FakeSecurityGroupRule.create_one_security_group_rule(
+ attrs)
+ self.network.create_security_group_rule = mock.Mock(
+ return_value=self._security_group_rule)
+ self.expected_data = (
+ self._security_group_rule.direction,
+ self._security_group_rule.ethertype,
+ self._security_group_rule.id,
+ self._security_group_rule.port_range_max,
+ self._security_group_rule.port_range_min,
+ self._security_group_rule.project_id,
+ self._security_group_rule.protocol,
+ self._security_group_rule.remote_group_id,
+ self._security_group_rule.remote_ip_prefix,
+ self._security_group_rule.security_group_id,
+ )
+
+ def setUp(self):
+ super(TestCreateSecurityGroupRuleNetwork, self).setUp()
+
+ self.network.find_security_group = mock.Mock(
+ return_value=self._security_group)
+
+ # Get the command object to test
+ self.cmd = security_group_rule.CreateSecurityGroupRule(
+ self.app, self.namespace)
+
+ def test_create_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_create_source_group_and_ip(self):
+ arglist = [
+ '--src-ip', '10.10.0.0/24',
+ '--src-group', self._security_group.id,
+ self._security_group.id,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_create_bad_protocol(self):
+ arglist = [
+ '--protocol', 'foo',
+ self._security_group.id,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_create_default_rule(self):
+ self._setup_security_group_rule({
+ 'port_range_max': 443,
+ 'port_range_min': 443,
+ })
+ arglist = [
+ '--dst-port', str(self._security_group_rule.port_range_min),
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'port_range_max': self._security_group_rule.port_range_max,
+ 'port_range_min': self._security_group_rule.port_range_min,
+ 'protocol': self._security_group_rule.protocol,
+ 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(tuple(self.expected_columns), columns)
+ self.assertEqual(self.expected_data, data)
+
+ def test_create_source_group(self):
+ self._setup_security_group_rule({
+ 'port_range_max': 22,
+ 'port_range_min': 22,
+ 'remote_group_id': self._security_group.id,
+ })
+ arglist = [
+ '--dst-port', str(self._security_group_rule.port_range_min),
+ '--src-group', self._security_group.id,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', (self._security_group_rule.port_range_min,
+ self._security_group_rule.port_range_max)),
+ ('src_group', self._security_group.id),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'port_range_max': self._security_group_rule.port_range_max,
+ 'port_range_min': self._security_group_rule.port_range_min,
+ 'protocol': self._security_group_rule.protocol,
+ 'remote_group_id': self._security_group_rule.remote_group_id,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(tuple(self.expected_columns), columns)
+ self.assertEqual(self.expected_data, data)
+
+ def test_create_source_ip(self):
+ self._setup_security_group_rule({
+ 'protocol': 'icmp',
+ 'port_range_max': -1,
+ 'port_range_min': -1,
+ 'remote_ip_prefix': '10.0.2.0/24',
+ })
+ arglist = [
+ '--proto', self._security_group_rule.protocol,
+ '--src-ip', self._security_group_rule.remote_ip_prefix,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('proto', self._security_group_rule.protocol),
+ ('src_ip', self._security_group_rule.remote_ip_prefix),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_security_group_rule.assert_called_once_with(**{
+ 'direction': self._security_group_rule.direction,
+ 'ethertype': self._security_group_rule.ethertype,
+ 'protocol': self._security_group_rule.protocol,
+ 'remote_ip_prefix': self._security_group_rule.remote_ip_prefix,
+ 'security_group_id': self._security_group.id,
+ })
+ self.assertEqual(tuple(self.expected_columns), columns)
+ self.assertEqual(self.expected_data, data)
+
+
+class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
+
+ # The security group rule to be created.
+ _security_group_rule = None
+
+ # The security group that will contain the rule created.
+ _security_group = \
+ compute_fakes.FakeSecurityGroup.create_one_security_group()
+
+ def _setup_security_group_rule(self, attrs=None):
+ self._security_group_rule = \
+ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule(
+ attrs)
+ self.compute.security_group_rules.create.return_value = \
+ self._security_group_rule
+ expected_columns, expected_data = \
+ security_group_rule._format_security_group_rule_show(
+ self._security_group_rule._info)
+ return expected_columns, expected_data
+
+ def setUp(self):
+ super(TestCreateSecurityGroupRuleCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ self.compute.security_groups.get.return_value = self._security_group
+
+ # Get the command object to test
+ self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None)
+
+ def test_create_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_create_source_group_and_ip(self):
+ arglist = [
+ '--src-ip', '10.10.0.0/24',
+ '--src-group', self._security_group.id,
+ self._security_group.id,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_create_bad_protocol(self):
+ arglist = [
+ '--protocol', 'foo',
+ self._security_group.id,
+ ]
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, arglist, [])
+
+ def test_create_default_rule(self):
+ expected_columns, expected_data = self._setup_security_group_rule()
+ dst_port = str(self._security_group_rule.from_port) + ':' + \
+ str(self._security_group_rule.to_port)
+ arglist = [
+ '--dst-port', dst_port,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', (self._security_group_rule.from_port,
+ self._security_group_rule.to_port)),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group.id,
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ None,
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+ def test_create_source_group(self):
+ expected_columns, expected_data = self._setup_security_group_rule({
+ 'from_port': 22,
+ 'to_port': 22,
+ 'group': {'name': self._security_group.id},
+ })
+ arglist = [
+ '--dst-port', str(self._security_group_rule.from_port),
+ '--src-group', self._security_group.id,
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('dst_port', (self._security_group_rule.from_port,
+ self._security_group_rule.to_port)),
+ ('src_group', self._security_group.id),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group.id,
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ self._security_group.id,
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+ def test_create_source_ip(self):
+ expected_columns, expected_data = self._setup_security_group_rule({
+ 'ip_protocol': 'icmp',
+ 'from_port': -1,
+ 'to_port': -1,
+ 'ip_range': {'cidr': '10.0.2.0/24'},
+ })
+ arglist = [
+ '--proto', self._security_group_rule.ip_protocol,
+ '--src-ip', self._security_group_rule.ip_range['cidr'],
+ self._security_group.id,
+ ]
+ verifylist = [
+ ('proto', self._security_group_rule.ip_protocol),
+ ('src_ip', self._security_group_rule.ip_range['cidr']),
+ ('group', self._security_group.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_group_rules.create.assert_called_once_with(
+ self._security_group.id,
+ self._security_group_rule.ip_protocol,
+ self._security_group_rule.from_port,
+ self._security_group_rule.to_port,
+ self._security_group_rule.ip_range['cidr'],
+ None,
+ )
+ self.assertEqual(expected_columns, columns)
+ self.assertEqual(expected_data, data)
+
+
class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
# The security group rule to be deleted.
@@ -68,7 +379,7 @@ class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
result = self.cmd.take_action(parsed_args)
- self.network.delete_security_group_rule.assert_called_with(
+ self.network.delete_security_group_rule.assert_called_once_with(
self._security_group_rule)
self.assertIsNone(result)
@@ -98,7 +409,7 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
result = self.cmd.take_action(parsed_args)
- self.compute.security_group_rules.delete.assert_called_with(
+ self.compute.security_group_rules.delete.assert_called_once_with(
self._security_group_rule.id)
self.assertIsNone(result)
@@ -160,7 +471,7 @@ class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
columns, data = self.cmd.take_action(parsed_args)
- self.network.find_security_group_rule.assert_called_with(
+ self.network.find_security_group_rule.assert_called_once_with(
self._security_group_rule.id, ignore_missing=False)
self.assertEqual(tuple(self.columns), columns)
self.assertEqual(self.data, data)
@@ -207,6 +518,6 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
columns, data = self.cmd.take_action(parsed_args)
- self.compute.security_groups.list.assert_called_with()
+ self.compute.security_groups.list.assert_called_once_with()
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)