summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/network/v2/port.py73
-rw-r--r--openstackclient/tests/unit/network/v2/test_port.py220
2 files changed, 292 insertions, 1 deletions
diff --git a/openstackclient/network/v2/port.py b/openstackclient/network/v2/port.py
index 92b286a9..80d52862 100644
--- a/openstackclient/network/v2/port.py
+++ b/openstackclient/network/v2/port.py
@@ -281,7 +281,23 @@ class CreatePort(command.ShowOne):
help=_("Name of this port")
)
# TODO(singhj): Add support for extended options:
- # qos,security groups,dhcp, address pairs
+ # qos,dhcp, address pairs
+ secgroups = parser.add_mutually_exclusive_group()
+ secgroups.add_argument(
+ '--security-group',
+ metavar='<security-group>',
+ action='append',
+ dest='security_groups',
+ help=_("Security group to associate with this port (name or ID) "
+ "(repeat option to set multiple security groups)")
+ )
+ secgroups.add_argument(
+ '--no-security-group',
+ dest='no_security_group',
+ action='store_true',
+ help=_("Associate no security groups with this port")
+ )
+
return parser
def take_action(self, parsed_args):
@@ -291,6 +307,14 @@ class CreatePort(command.ShowOne):
parsed_args.network = _network.id
_prepare_fixed_ips(self.app.client_manager, parsed_args)
attrs = _get_attrs(self.app.client_manager, parsed_args)
+
+ if parsed_args.security_groups:
+ attrs['security_groups'] = [client.find_security_group(
+ sg, ignore_missing=False).id
+ for sg in parsed_args.security_groups]
+ if parsed_args.no_security_group:
+ attrs['security_groups'] = []
+
obj = client.create_port(**attrs)
columns = _get_columns(obj)
data = utils.get_item_properties(obj, columns, formatters=_formatters)
@@ -463,6 +487,21 @@ class SetPort(command.Command):
metavar="<port>",
help=_("Port to modify (name or ID)")
)
+ parser.add_argument(
+ '--security-group',
+ metavar='<security-group>',
+ action='append',
+ dest='security_groups',
+ help=_("Security group to associate with this port (name or ID) "
+ "(repeat option to set multiple security groups)")
+ )
+ parser.add_argument(
+ '--no-security-group',
+ dest='no_security_group',
+ action='store_true',
+ help=_("Clear existing security groups associated with this port")
+ )
+
return parser
def take_action(self, parsed_args):
@@ -490,6 +529,17 @@ class SetPort(command.Command):
attrs['fixed_ips'] += [ip for ip in obj.fixed_ips if ip]
elif parsed_args.no_fixed_ip:
attrs['fixed_ips'] = []
+ if parsed_args.security_groups and parsed_args.no_security_group:
+ attrs['security_groups'] = [client.find_security_group(sg,
+ ignore_missing=False).id
+ for sg in parsed_args.security_groups]
+ elif parsed_args.security_groups:
+ attrs['security_groups'] = obj.security_groups
+ for sg in parsed_args.security_groups:
+ sg_id = client.find_security_group(sg, ignore_missing=False).id
+ attrs['security_groups'].append(sg_id)
+ elif parsed_args.no_security_group:
+ attrs['security_groups'] = []
client.update_port(obj, **attrs)
@@ -536,6 +586,15 @@ class UnsetPort(command.Command):
help=_("Desired key which should be removed from binding:profile"
"(repeat option to unset multiple binding:profile data)"))
parser.add_argument(
+ '--security-group',
+ metavar='<security-group>',
+ action='append',
+ dest='security_groups',
+ help=_("Security group which should be removed this port (name "
+ "or ID) (repeat option to unset multiple security groups)")
+ )
+
+ parser.add_argument(
'port',
metavar="<port>",
help=_("Port to modify (name or ID)")
@@ -550,6 +609,7 @@ class UnsetPort(command.Command):
# Unset* classes
tmp_fixed_ips = copy.deepcopy(obj.fixed_ips)
tmp_binding_profile = copy.deepcopy(obj.binding_profile)
+ tmp_secgroups = copy.deepcopy(obj.security_groups)
_prepare_fixed_ips(self.app.client_manager, parsed_args)
attrs = {}
if parsed_args.fixed_ip:
@@ -568,5 +628,16 @@ class UnsetPort(command.Command):
msg = _("Port does not contain binding-profile %s") % key
raise exceptions.CommandError(msg)
attrs['binding:profile'] = tmp_binding_profile
+ if parsed_args.security_groups:
+ try:
+ for sg in parsed_args.security_groups:
+ sg_id = client.find_security_group(
+ sg, ignore_missing=False).id
+ tmp_secgroups.remove(sg_id)
+ except ValueError:
+ msg = _("Port does not contain security group %s") % sg
+ raise exceptions.CommandError(msg)
+ attrs['security_groups'] = tmp_secgroups
+
if attrs:
client.update_port(obj, **attrs)
diff --git a/openstackclient/tests/unit/network/v2/test_port.py b/openstackclient/tests/unit/network/v2/test_port.py
index a2aceab1..b27e71b5 100644
--- a/openstackclient/tests/unit/network/v2/test_port.py
+++ b/openstackclient/tests/unit/network/v2/test_port.py
@@ -228,6 +228,93 @@ class TestCreatePort(TestPort):
self.assertEqual(ref_columns, columns)
self.assertEqual(ref_data, data)
+ def test_create_with_security_group(self):
+ secgroup = network_fakes.FakeSecurityGroup.create_one_security_group()
+ self.network.find_security_group = mock.Mock(return_value=secgroup)
+ arglist = [
+ '--network', self._port.network_id,
+ '--security-group', secgroup.id,
+ 'test-port',
+ ]
+ verifylist = [
+ ('network', self._port.network_id,),
+ ('enable', True),
+ ('security_groups', [secgroup.id]),
+ ('name', 'test-port'),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_port.assert_called_once_with(**{
+ 'admin_state_up': True,
+ 'network_id': self._port.network_id,
+ 'security_groups': [secgroup.id],
+ 'name': 'test-port',
+ })
+
+ ref_columns, ref_data = self._get_common_cols_data(self._port)
+ self.assertEqual(ref_columns, columns)
+ self.assertEqual(ref_data, data)
+
+ def test_create_with_security_groups(self):
+ sg_1 = network_fakes.FakeSecurityGroup.create_one_security_group()
+ sg_2 = network_fakes.FakeSecurityGroup.create_one_security_group()
+ self.network.find_security_group = mock.Mock(side_effect=[sg_1, sg_2])
+ arglist = [
+ '--network', self._port.network_id,
+ '--security-group', sg_1.id,
+ '--security-group', sg_2.id,
+ 'test-port',
+ ]
+ verifylist = [
+ ('network', self._port.network_id,),
+ ('enable', True),
+ ('security_groups', [sg_1.id, sg_2.id]),
+ ('name', 'test-port'),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_port.assert_called_once_with(**{
+ 'admin_state_up': True,
+ 'network_id': self._port.network_id,
+ 'security_groups': [sg_1.id, sg_2.id],
+ 'name': 'test-port',
+ })
+
+ ref_columns, ref_data = self._get_common_cols_data(self._port)
+ self.assertEqual(ref_columns, columns)
+ self.assertEqual(ref_data, data)
+
+ def test_create_with_no_secuirty_groups(self):
+ arglist = [
+ '--network', self._port.network_id,
+ '--no-security-group',
+ 'test-port',
+ ]
+ verifylist = [
+ ('network', self._port.network_id),
+ ('enable', True),
+ ('no_security_group', True),
+ ('name', 'test-port'),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_port.assert_called_once_with(**{
+ 'admin_state_up': True,
+ 'network_id': self._port.network_id,
+ 'security_groups': [],
+ 'name': 'test-port',
+ })
+
+ ref_columns, ref_data = self._get_common_cols_data(self._port)
+ self.assertEqual(ref_columns, columns)
+ self.assertEqual(ref_data, data)
+
class TestDeletePort(TestPort):
@@ -651,6 +738,95 @@ class TestSetPort(TestPort):
self.network.update_port.assert_called_once_with(self._port, **attrs)
self.assertIsNone(result)
+ def test_set_security_group(self):
+ sg = network_fakes.FakeSecurityGroup.create_one_security_group()
+ self.network.find_security_group = mock.Mock(return_value=sg)
+ arglist = [
+ '--security-group', sg.id,
+ self._port.name,
+ ]
+ verifylist = [
+ ('security_groups', [sg.id]),
+ ('port', self._port.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'security_groups': [sg.id],
+ }
+ self.network.update_port.assert_called_once_with(self._port, **attrs)
+ self.assertIsNone(result)
+
+ def test_append_security_group(self):
+ sg_1 = network_fakes.FakeSecurityGroup.create_one_security_group()
+ sg_2 = network_fakes.FakeSecurityGroup.create_one_security_group()
+ sg_3 = network_fakes.FakeSecurityGroup.create_one_security_group()
+ self.network.find_security_group = mock.Mock(side_effect=[sg_2, sg_3])
+ _testport = network_fakes.FakePort.create_one_port(
+ {'security_groups': [sg_1.id]})
+ self.network.find_port = mock.Mock(return_value=_testport)
+ arglist = [
+ '--security-group', sg_2.id,
+ '--security-group', sg_3.id,
+ _testport.name,
+ ]
+ verifylist = [
+ ('security_groups', [sg_2.id, sg_3.id]),
+ ('port', _testport.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ attrs = {
+ 'security_groups': [sg_1.id, sg_2.id, sg_3.id],
+ }
+ self.network.update_port.assert_called_once_with(_testport, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_no_security_groups(self):
+ arglist = [
+ '--no-security-group',
+ self._port.name,
+ ]
+ verifylist = [
+ ('no_security_group', True),
+ ('port', self._port.name),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'security_groups': [],
+ }
+ self.network.update_port.assert_called_once_with(self._port, **attrs)
+ self.assertIsNone(result)
+
+ def test_overwrite_security_group(self):
+ sg1 = network_fakes.FakeSecurityGroup.create_one_security_group()
+ sg2 = network_fakes.FakeSecurityGroup.create_one_security_group()
+ _testport = network_fakes.FakePort.create_one_port(
+ {'security_groups': [sg1.id]})
+ self.network.find_port = mock.Mock(return_value=_testport)
+ self.network.find_security_group = mock.Mock(return_value=sg2)
+ arglist = [
+ '--security-group', sg2.id,
+ '--no-security-group',
+ _testport.name,
+ ]
+ verifylist = [
+ ('security_groups', [sg2.id]),
+ ('no_security_group', True)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ attrs = {
+ 'security_groups': [sg2.id],
+ }
+ self.network.update_port.assert_called_once_with(_testport, **attrs)
+ self.assertIsNone(result)
+
class TestShowPort(TestPort):
@@ -767,3 +943,47 @@ class TestUnsetPort(TestPort):
self.assertRaises(exceptions.CommandError,
self.cmd.take_action,
parsed_args)
+
+ def test_unset_security_group(self):
+ _fake_sg1 = network_fakes.FakeSecurityGroup.create_one_security_group()
+ _fake_sg2 = network_fakes.FakeSecurityGroup.create_one_security_group()
+ _fake_port = network_fakes.FakePort.create_one_port(
+ {'security_groups': [_fake_sg1.id, _fake_sg2.id]})
+ self.network.find_port = mock.Mock(return_value=_fake_port)
+ self.network.find_security_group = mock.Mock(return_value=_fake_sg2)
+ arglist = [
+ '--security-group', _fake_sg2.id,
+ _fake_port.name,
+ ]
+ verifylist = [
+ ('security_groups', [_fake_sg2.id]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'security_groups': [_fake_sg1.id]
+ }
+ self.network.update_port.assert_called_once_with(
+ _fake_port, **attrs)
+ self.assertIsNone(result)
+
+ def test_unset_port_security_group_not_existent(self):
+ _fake_sg1 = network_fakes.FakeSecurityGroup.create_one_security_group()
+ _fake_sg2 = network_fakes.FakeSecurityGroup.create_one_security_group()
+ _fake_port = network_fakes.FakePort.create_one_port(
+ {'security_groups': [_fake_sg1.id]})
+ self.network.find_security_group = mock.Mock(return_value=_fake_sg2)
+ arglist = [
+ '--security-group', _fake_sg2.id,
+ _fake_port.name,
+ ]
+ verifylist = [
+ ('security_groups', [_fake_sg2.id]),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.assertRaises(exceptions.CommandError,
+ self.cmd.take_action,
+ parsed_args)