summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/compute/v2/security_group.py11
-rw-r--r--openstackclient/tests/compute/v2/test_security_group_rule.py338
2 files changed, 345 insertions, 4 deletions
diff --git a/openstackclient/compute/v2/security_group.py b/openstackclient/compute/v2/security_group.py
index 3dc9bae0..25c2ed3f 100644
--- a/openstackclient/compute/v2/security_group.py
+++ b/openstackclient/compute/v2/security_group.py
@@ -50,10 +50,10 @@ def _xform_security_group_rule(sgroup):
info['ip_range'] = info['ip_range']['cidr']
else:
info['ip_range'] = ''
- if info['ip_protocol'] == 'icmp':
- info['port_range'] = ''
- elif info['ip_protocol'] is None:
+ if info['ip_protocol'] is None:
info['ip_protocol'] = ''
+ elif info['ip_protocol'].lower() == 'icmp':
+ info['port_range'] = ''
return info
@@ -307,7 +307,10 @@ class CreateSecurityGroupRule(show.ShowOne):
compute_client.security_groups,
parsed_args.group,
)
- from_port, to_port = parsed_args.dst_port
+ if parsed_args.proto.lower() == 'icmp':
+ from_port, to_port = -1, -1
+ else:
+ from_port, to_port = parsed_args.dst_port
data = compute_client.security_group_rules.create(
group.id,
parsed_args.proto,
diff --git a/openstackclient/tests/compute/v2/test_security_group_rule.py b/openstackclient/tests/compute/v2/test_security_group_rule.py
new file mode 100644
index 00000000..9516f8dd
--- /dev/null
+++ b/openstackclient/tests/compute/v2/test_security_group_rule.py
@@ -0,0 +1,338 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import copy
+import mock
+
+from openstackclient.compute.v2 import security_group
+from openstackclient.tests.compute.v2 import fakes as compute_fakes
+from openstackclient.tests import fakes
+from openstackclient.tests.identity.v2_0 import fakes as identity_fakes
+
+
+security_group_id = '11'
+security_group_name = 'wide-open'
+security_group_description = 'nothing but net'
+
+security_group_rule_id = '1'
+
+SECURITY_GROUP = {
+ 'id': security_group_id,
+ 'name': security_group_name,
+ 'description': security_group_description,
+ 'tenant_id': identity_fakes.project_id,
+}
+
+SECURITY_GROUP_RULE = {
+ 'id': security_group_rule_id,
+ 'group': {},
+ 'ip_protocol': 'tcp',
+ 'ip_range': '0.0.0.0/0',
+ 'parent_group_id': security_group_id,
+ 'from_port': 0,
+ 'to_port': 0,
+}
+
+SECURITY_GROUP_RULE_ICMP = {
+ 'id': security_group_rule_id,
+ 'group': {},
+ 'ip_protocol': 'icmp',
+ 'ip_range': '0.0.0.0/0',
+ 'parent_group_id': security_group_id,
+ 'from_port': -1,
+ 'to_port': -1,
+}
+
+
+class FakeSecurityGroupRuleResource(fakes.FakeResource):
+
+ def get_keys(self):
+ return {'property': 'value'}
+
+
+class TestSecurityGroupRule(compute_fakes.TestComputev2):
+
+ def setUp(self):
+ super(TestSecurityGroupRule, self).setUp()
+
+ self.secgroups_mock = mock.Mock()
+ self.secgroups_mock.resource_class = fakes.FakeResource(None, {})
+ self.app.client_manager.compute.security_groups = self.secgroups_mock
+ self.secgroups_mock.reset_mock()
+
+ self.sg_rules_mock = mock.Mock()
+ self.sg_rules_mock.resource_class = fakes.FakeResource(None, {})
+ self.app.client_manager.compute.security_group_rules = \
+ self.sg_rules_mock
+ self.sg_rules_mock.reset_mock()
+
+
+class TestSecurityGroupRuleCreate(TestSecurityGroupRule):
+
+ def setUp(self):
+ super(TestSecurityGroupRuleCreate, self).setUp()
+
+ self.secgroups_mock.get.return_value = FakeSecurityGroupRuleResource(
+ None,
+ copy.deepcopy(SECURITY_GROUP),
+ loaded=True,
+ )
+
+ # Get the command object to test
+ self.cmd = security_group.CreateSecurityGroupRule(self.app, None)
+
+ def test_security_group_rule_create_no_options(self):
+ self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
+ None,
+ copy.deepcopy(SECURITY_GROUP_RULE),
+ loaded=True,
+ )
+
+ arglist = [
+ security_group_name,
+ ]
+ verifylist = [
+ ('group', security_group_name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # SecurityGroupManager.create(name, description)
+ self.sg_rules_mock.create.assert_called_with(
+ security_group_id,
+ 'tcp',
+ 0,
+ 0,
+ '0.0.0.0/0',
+ )
+
+ collist = (
+ 'group',
+ 'id',
+ 'ip_protocol',
+ 'ip_range',
+ 'parent_group_id',
+ 'port_range',
+ )
+ self.assertEqual(collist, columns)
+ datalist = (
+ {},
+ security_group_rule_id,
+ 'tcp',
+ '',
+ security_group_id,
+ '0:0',
+ )
+ self.assertEqual(datalist, data)
+
+ def test_security_group_rule_create_ftp(self):
+ sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
+ sg_rule['from_port'] = 20
+ sg_rule['to_port'] = 21
+ self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
+ None,
+ sg_rule,
+ loaded=True,
+ )
+
+ arglist = [
+ security_group_name,
+ '--dst-port', '20:21',
+ ]
+ verifylist = [
+ ('group', security_group_name),
+ ('dst_port', (20, 21)),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # SecurityGroupManager.create(name, description)
+ self.sg_rules_mock.create.assert_called_with(
+ security_group_id,
+ 'tcp',
+ 20,
+ 21,
+ '0.0.0.0/0',
+ )
+
+ collist = (
+ 'group',
+ 'id',
+ 'ip_protocol',
+ 'ip_range',
+ 'parent_group_id',
+ 'port_range',
+ )
+ self.assertEqual(collist, columns)
+ datalist = (
+ {},
+ security_group_rule_id,
+ 'tcp',
+ '',
+ security_group_id,
+ '20:21',
+ )
+ self.assertEqual(datalist, data)
+
+ def test_security_group_rule_create_ssh(self):
+ sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
+ sg_rule['from_port'] = 22
+ sg_rule['to_port'] = 22
+ self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
+ None,
+ sg_rule,
+ loaded=True,
+ )
+
+ arglist = [
+ security_group_name,
+ '--dst-port', '22',
+ ]
+ verifylist = [
+ ('group', security_group_name),
+ ('dst_port', (22, 22)),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # SecurityGroupManager.create(name, description)
+ self.sg_rules_mock.create.assert_called_with(
+ security_group_id,
+ 'tcp',
+ 22,
+ 22,
+ '0.0.0.0/0',
+ )
+
+ collist = (
+ 'group',
+ 'id',
+ 'ip_protocol',
+ 'ip_range',
+ 'parent_group_id',
+ 'port_range',
+ )
+ self.assertEqual(collist, columns)
+ datalist = (
+ {},
+ security_group_rule_id,
+ 'tcp',
+ '',
+ security_group_id,
+ '22:22',
+ )
+ self.assertEqual(datalist, data)
+
+ def test_security_group_rule_create_udp(self):
+ sg_rule = copy.deepcopy(SECURITY_GROUP_RULE)
+ sg_rule['ip_protocol'] = 'udp'
+ self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
+ None,
+ sg_rule,
+ loaded=True,
+ )
+
+ arglist = [
+ security_group_name,
+ '--proto', 'udp',
+ ]
+ verifylist = [
+ ('group', security_group_name),
+ ('proto', 'udp'),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # SecurityGroupManager.create(name, description)
+ self.sg_rules_mock.create.assert_called_with(
+ security_group_id,
+ 'udp',
+ 0,
+ 0,
+ '0.0.0.0/0',
+ )
+
+ collist = (
+ 'group',
+ 'id',
+ 'ip_protocol',
+ 'ip_range',
+ 'parent_group_id',
+ 'port_range',
+ )
+ self.assertEqual(collist, columns)
+ datalist = (
+ {},
+ security_group_rule_id,
+ 'udp',
+ '',
+ security_group_id,
+ '0:0',
+ )
+ self.assertEqual(datalist, data)
+
+ def test_security_group_rule_create_icmp(self):
+ self.sg_rules_mock.create.return_value = FakeSecurityGroupRuleResource(
+ None,
+ copy.deepcopy(SECURITY_GROUP_RULE_ICMP),
+ loaded=True,
+ )
+
+ arglist = [
+ security_group_name,
+ '--proto', 'ICMP',
+ ]
+ verifylist = [
+ ('group', security_group_name),
+ ('proto', 'ICMP'),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ # DisplayCommandBase.take_action() returns two tuples
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # SecurityGroupManager.create(name, description)
+ self.sg_rules_mock.create.assert_called_with(
+ security_group_id,
+ 'ICMP',
+ -1,
+ -1,
+ '0.0.0.0/0',
+ )
+
+ collist = (
+ 'group',
+ 'id',
+ 'ip_protocol',
+ 'ip_range',
+ 'parent_group_id',
+ 'port_range',
+ )
+ self.assertEqual(collist, columns)
+ datalist = (
+ {},
+ security_group_rule_id,
+ 'icmp',
+ '',
+ security_group_id,
+ '',
+ )
+ self.assertEqual(datalist, data)