summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/network/v2/security_group_rule.py86
-rw-r--r--openstackclient/tests/network/v2/fakes.py14
-rw-r--r--openstackclient/tests/network/v2/test_security_group_rule.py112
3 files changed, 208 insertions, 4 deletions
diff --git a/openstackclient/network/v2/security_group_rule.py b/openstackclient/network/v2/security_group_rule.py
index beeeaff7..a61e3233 100644
--- a/openstackclient/network/v2/security_group_rule.py
+++ b/openstackclient/network/v2/security_group_rule.py
@@ -13,9 +13,54 @@
"""Security Group Rule action implementations"""
+import six
+
+from openstackclient.common import exceptions
+from openstackclient.common import utils
from openstackclient.network import common
+def _xform_security_group_rule(sgroup):
+ info = {}
+ info.update(sgroup)
+ from_port = info.pop('from_port')
+ to_port = info.pop('to_port')
+ if isinstance(from_port, int) and isinstance(to_port, int):
+ port_range = {'port_range': "%u:%u" % (from_port, to_port)}
+ elif from_port is None and to_port is None:
+ port_range = {'port_range': ""}
+ else:
+ port_range = {'port_range': "%s:%s" % (from_port, to_port)}
+ info.update(port_range)
+ if 'cidr' in info['ip_range']:
+ info['ip_range'] = info['ip_range']['cidr']
+ else:
+ info['ip_range'] = ''
+ if info['ip_protocol'] is None:
+ info['ip_protocol'] = ''
+ elif info['ip_protocol'].lower() == 'icmp':
+ info['port_range'] = ''
+ group = info.pop('group')
+ if 'name' in group:
+ info['remote_security_group'] = group['name']
+ else:
+ info['remote_security_group'] = ''
+ return info
+
+
+def _format_security_group_rule_show(obj):
+ data = _xform_security_group_rule(obj)
+ return zip(*sorted(six.iteritems(data)))
+
+
+def _get_columns(item):
+ columns = item.keys()
+ if 'tenant_id' in columns:
+ columns.remove('tenant_id')
+ columns.append('project_id')
+ return tuple(sorted(columns))
+
+
class DeleteSecurityGroupRule(common.NetworkAndComputeCommand):
"""Delete a security group rule"""
@@ -33,3 +78,44 @@ class DeleteSecurityGroupRule(common.NetworkAndComputeCommand):
def take_action_compute(self, client, parsed_args):
client.security_group_rules.delete(parsed_args.rule)
+
+
+class ShowSecurityGroupRule(common.NetworkAndComputeShowOne):
+ """Display security group rule details"""
+
+ def update_parser_common(self, parser):
+ parser.add_argument(
+ 'rule',
+ metavar="<rule>",
+ help="Security group rule to display (ID only)"
+ )
+ return parser
+
+ def take_action_network(self, client, parsed_args):
+ obj = client.find_security_group_rule(parsed_args.rule,
+ ignore_missing=False)
+ columns = _get_columns(obj)
+ data = utils.get_item_properties(obj, columns)
+ return (columns, data)
+
+ def take_action_compute(self, client, parsed_args):
+ # NOTE(rtheis): Unfortunately, compute does not have an API
+ # to get or list security group rules so parse through the
+ # security groups to find all accessible rules in search of
+ # the requested rule.
+ obj = None
+ security_group_rules = []
+ for security_group in client.security_groups.list():
+ security_group_rules.extend(security_group.rules)
+ for security_group_rule in security_group_rules:
+ if parsed_args.rule == str(security_group_rule.get('id')):
+ obj = security_group_rule
+ break
+
+ if obj is None:
+ msg = "Could not find security group rule " \
+ "with ID %s" % parsed_args.rule
+ raise exceptions.CommandError(msg)
+
+ # NOTE(rtheis): Format security group rule
+ return _format_security_group_rule_show(obj)
diff --git a/openstackclient/tests/network/v2/fakes.py b/openstackclient/tests/network/v2/fakes.py
index c24410e1..b1784b60 100644
--- a/openstackclient/tests/network/v2/fakes.py
+++ b/openstackclient/tests/network/v2/fakes.py
@@ -480,15 +480,13 @@ class FakeSecurityGroupRule(object):
:param Dictionary methods:
A dictionary with all methods
:return:
- A FakeResource object, with id, name, etc.
+ A FakeResource object, with id, etc.
"""
# Set default attributes.
security_group_rule_attrs = {
- 'description': 'security-group-rule-desc-' + uuid.uuid4().hex,
'direction': 'ingress',
'ethertype': 'IPv4',
'id': 'security-group-rule-id-' + uuid.uuid4().hex,
- 'name': 'security-group-rule-name-' + uuid.uuid4().hex,
'port_range_max': None,
'port_range_min': None,
'protocol': None,
@@ -502,7 +500,11 @@ class FakeSecurityGroupRule(object):
security_group_rule_attrs.update(attrs)
# Set default methods.
- security_group_rule_methods = {}
+ security_group_rule_methods = {
+ 'keys': ['direction', 'ethertype', 'id', 'port_range_max',
+ 'port_range_min', 'protocol', 'remote_group_id',
+ 'remote_ip_prefix', 'security_group_id', 'tenant_id'],
+ }
# Overwrite default methods.
security_group_rule_methods.update(methods)
@@ -511,6 +513,10 @@ class FakeSecurityGroupRule(object):
info=copy.deepcopy(security_group_rule_attrs),
methods=copy.deepcopy(security_group_rule_methods),
loaded=True)
+
+ # Set attributes with special mappings.
+ security_group_rule.project_id = security_group_rule_attrs['tenant_id']
+
return security_group_rule
@staticmethod
diff --git a/openstackclient/tests/network/v2/test_security_group_rule.py b/openstackclient/tests/network/v2/test_security_group_rule.py
index c6ef3884..db15d0e2 100644
--- a/openstackclient/tests/network/v2/test_security_group_rule.py
+++ b/openstackclient/tests/network/v2/test_security_group_rule.py
@@ -11,11 +11,14 @@
# under the License.
#
+import copy
import mock
from openstackclient.network.v2 import security_group_rule
from openstackclient.tests.compute.v2 import fakes as compute_fakes
+from openstackclient.tests import fakes
from openstackclient.tests.network.v2 import fakes as network_fakes
+from openstackclient.tests import utils as tests_utils
class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2):
@@ -98,3 +101,112 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
self.compute.security_group_rules.delete.assert_called_with(
self._security_group_rule.id)
self.assertIsNone(result)
+
+
+class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
+
+ # The security group rule to be shown.
+ _security_group_rule = \
+ network_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
+
+ columns = (
+ 'direction',
+ 'ethertype',
+ 'id',
+ 'port_range_max',
+ 'port_range_min',
+ 'project_id',
+ 'protocol',
+ 'remote_group_id',
+ 'remote_ip_prefix',
+ 'security_group_id',
+ )
+
+ data = (
+ _security_group_rule.direction,
+ _security_group_rule.ethertype,
+ _security_group_rule.id,
+ _security_group_rule.port_range_max,
+ _security_group_rule.port_range_min,
+ _security_group_rule.project_id,
+ _security_group_rule.protocol,
+ _security_group_rule.remote_group_id,
+ _security_group_rule.remote_ip_prefix,
+ _security_group_rule.security_group_id,
+ )
+
+ def setUp(self):
+ super(TestShowSecurityGroupRuleNetwork, self).setUp()
+
+ self.network.find_security_group_rule = mock.Mock(
+ return_value=self._security_group_rule)
+
+ # Get the command object to test
+ self.cmd = security_group_rule.ShowSecurityGroupRule(
+ self.app, self.namespace)
+
+ def test_show_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_show_all_options(self):
+ arglist = [
+ self._security_group_rule.id,
+ ]
+ verifylist = [
+ ('rule', self._security_group_rule.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_security_group_rule.assert_called_with(
+ self._security_group_rule.id, ignore_missing=False)
+ self.assertEqual(tuple(self.columns), columns)
+ self.assertEqual(self.data, data)
+
+
+class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
+
+ # The security group rule to be shown.
+ _security_group_rule = \
+ compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
+
+ columns, data = \
+ security_group_rule._format_security_group_rule_show(
+ _security_group_rule._info)
+
+ def setUp(self):
+ super(TestShowSecurityGroupRuleCompute, self).setUp()
+
+ self.app.client_manager.network_endpoint_enabled = False
+
+ # Build a security group fake customized for this test.
+ security_group_rules = [self._security_group_rule._info]
+ security_group = fakes.FakeResource(
+ info=copy.deepcopy({'rules': security_group_rules}),
+ loaded=True)
+ security_group.rules = security_group_rules
+ self.compute.security_groups.list.return_value = [security_group]
+
+ # Get the command object to test
+ self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None)
+
+ def test_show_no_options(self):
+ self.assertRaises(tests_utils.ParserException,
+ self.check_parser, self.cmd, [], [])
+
+ def test_show_all_options(self):
+ arglist = [
+ self._security_group_rule.id,
+ ]
+ verifylist = [
+ ('rule', self._security_group_rule.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.compute.security_groups.list.assert_called_with()
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)