summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2017-02-18 03:36:06 +0000
committerGerrit Code Review <review@openstack.org>2017-02-18 03:36:06 +0000
commitc88975136d675c947167533191b75c849cd0a1b5 (patch)
treee592862adb330eb0184b452e3355a0ffb7eb2405 /openstackclient
parentef1a86a802149e0a62c68fb93edf66b802bc72d1 (diff)
parentb2fd8ba869cd4b8e927118f7712d0ed7fb60309f (diff)
downloadpython-openstackclient-c88975136d675c947167533191b75c849cd0a1b5.tar.gz
Merge "Add "encryption-*" options in volume type commands"
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/tests/functional/volume/v1/test_volume_type.py71
-rw-r--r--openstackclient/tests/functional/volume/v2/test_volume_type.py87
-rw-r--r--openstackclient/tests/unit/volume/v1/fakes.py31
-rw-r--r--openstackclient/tests/unit/volume/v1/test_type.py237
-rw-r--r--openstackclient/tests/unit/volume/v2/fakes.py31
-rw-r--r--openstackclient/tests/unit/volume/v2/test_type.py276
-rw-r--r--openstackclient/volume/v1/volume_type.py221
-rw-r--r--openstackclient/volume/v2/volume_type.py210
8 files changed, 1145 insertions, 19 deletions
diff --git a/openstackclient/tests/functional/volume/v1/test_volume_type.py b/openstackclient/tests/functional/volume/v1/test_volume_type.py
index 955759b6..d1842795 100644
--- a/openstackclient/tests/functional/volume/v1/test_volume_type.py
+++ b/openstackclient/tests/functional/volume/v1/test_volume_type.py
@@ -87,3 +87,74 @@ class VolumeTypeTests(common.BaseVolumeTests):
time.sleep(5)
raw_output = self.openstack(cmd)
self.assertOutput('', raw_output)
+
+ # NOTE: Add some basic funtional tests with the old format to
+ # make sure the command works properly, need to change
+ # these to new test format when beef up all tests for
+ # volume tye commands.
+ def test_encryption_type(self):
+ encryption_type = uuid.uuid4().hex
+ # test create new encryption type
+ opts = self.get_opts(['encryption'])
+ raw_output = self.openstack(
+ 'volume type create '
+ '--encryption-provider LuksEncryptor '
+ '--encryption-cipher aes-xts-plain64 '
+ '--encryption-key-size 128 '
+ '--encryption-control-location front-end ' +
+ encryption_type + opts)
+ expected = ["provider='LuksEncryptor'",
+ "cipher='aes-xts-plain64'",
+ "key_size='128'",
+ "control_location='front-end'"]
+ for attr in expected:
+ self.assertIn(attr, raw_output)
+ # test show encryption type
+ opts = self.get_opts(['encryption'])
+ raw_output = self.openstack(
+ 'volume type show --encryption-type ' + encryption_type + opts)
+ expected = ["provider='LuksEncryptor'",
+ "cipher='aes-xts-plain64'",
+ "key_size='128'",
+ "control_location='front-end'"]
+ for attr in expected:
+ self.assertIn(attr, raw_output)
+ # test list encryption type
+ opts = self.get_opts(['Encryption'])
+ raw_output = self.openstack(
+ 'volume type list --encryption-type ' + opts)
+ expected = ["provider='LuksEncryptor'",
+ "cipher='aes-xts-plain64'",
+ "key_size='128'",
+ "control_location='front-end'"]
+ for attr in expected:
+ self.assertIn(attr, raw_output)
+ # test set new encryption type
+ raw_output = self.openstack(
+ 'volume type set '
+ '--encryption-provider LuksEncryptor '
+ '--encryption-cipher aes-xts-plain64 '
+ '--encryption-key-size 128 '
+ '--encryption-control-location front-end ' +
+ self.NAME)
+ self.assertEqual('', raw_output)
+ opts = self.get_opts(['encryption'])
+ raw_output = self.openstack(
+ 'volume type show --encryption-type ' + self.NAME + opts)
+ expected = ["provider='LuksEncryptor'",
+ "cipher='aes-xts-plain64'",
+ "key_size='128'",
+ "control_location='front-end'"]
+ for attr in expected:
+ self.assertIn(attr, raw_output)
+ # test unset encryption type
+ raw_output = self.openstack(
+ 'volume type unset --encryption-type ' + self.NAME)
+ self.assertEqual('', raw_output)
+ opts = self.get_opts(['encryption'])
+ raw_output = self.openstack(
+ 'volume type show --encryption-type ' + self.NAME + opts)
+ self.assertEqual('\n', raw_output)
+ # test delete encryption type
+ raw_output = self.openstack('volume type delete ' + encryption_type)
+ self.assertEqual('', raw_output)
diff --git a/openstackclient/tests/functional/volume/v2/test_volume_type.py b/openstackclient/tests/functional/volume/v2/test_volume_type.py
index b4df5b2d..a5d0a767 100644
--- a/openstackclient/tests/functional/volume/v2/test_volume_type.py
+++ b/openstackclient/tests/functional/volume/v2/test_volume_type.py
@@ -102,3 +102,90 @@ class VolumeTypeTests(common.BaseVolumeTests):
time.sleep(5)
raw_output = self.openstack(cmd)
self.assertOutput('', raw_output)
+
+ # NOTE: Add some basic funtional tests with the old format to
+ # make sure the command works properly, need to change
+ # these to new test format when beef up all tests for
+ # volume tye commands.
+ def test_encryption_type(self):
+ encryption_type = uuid.uuid4().hex
+ # test create new encryption type
+ opts = self.get_opts(['encryption'])
+ raw_output = self.openstack(
+ 'volume type create '
+ '--encryption-provider LuksEncryptor '
+ '--encryption-cipher aes-xts-plain64 '
+ '--encryption-key-size 128 '
+ '--encryption-control-location front-end ' +
+ encryption_type + opts)
+ expected = ["provider='LuksEncryptor'",
+ "cipher='aes-xts-plain64'",
+ "key_size='128'",
+ "control_location='front-end'"]
+ for attr in expected:
+ self.assertIn(attr, raw_output)
+ # test show encryption type
+ opts = self.get_opts(['encryption'])
+ raw_output = self.openstack(
+ 'volume type show --encryption-type ' + encryption_type + opts)
+ expected = ["provider='LuksEncryptor'",
+ "cipher='aes-xts-plain64'",
+ "key_size='128'",
+ "control_location='front-end'"]
+ for attr in expected:
+ self.assertIn(attr, raw_output)
+ # test list encryption type
+ opts = self.get_opts(['Encryption'])
+ raw_output = self.openstack(
+ 'volume type list --encryption-type ' + opts)
+ expected = ["provider='LuksEncryptor'",
+ "cipher='aes-xts-plain64'",
+ "key_size='128'",
+ "control_location='front-end'"]
+ for attr in expected:
+ self.assertIn(attr, raw_output)
+ # test set existing encryption type
+ raw_output = self.openstack(
+ 'volume type set '
+ '--encryption-key-size 256 '
+ '--encryption-control-location back-end ' +
+ encryption_type)
+ self.assertEqual('', raw_output)
+ opts = self.get_opts(['encryption'])
+ raw_output = self.openstack(
+ 'volume type show --encryption-type ' + encryption_type + opts)
+ expected = ["provider='LuksEncryptor'",
+ "cipher='aes-xts-plain64'",
+ "key_size='256'",
+ "control_location='back-end'"]
+ for attr in expected:
+ self.assertIn(attr, raw_output)
+ # test set new encryption type
+ raw_output = self.openstack(
+ 'volume type set '
+ '--encryption-provider LuksEncryptor '
+ '--encryption-cipher aes-xts-plain64 '
+ '--encryption-key-size 128 '
+ '--encryption-control-location front-end ' +
+ self.NAME)
+ self.assertEqual('', raw_output)
+ opts = self.get_opts(['encryption'])
+ raw_output = self.openstack(
+ 'volume type show --encryption-type ' + self.NAME + opts)
+ expected = ["provider='LuksEncryptor'",
+ "cipher='aes-xts-plain64'",
+ "key_size='128'",
+ "control_location='front-end'"]
+ for attr in expected:
+ self.assertIn(attr, raw_output)
+ # test unset encryption type
+ raw_output = self.openstack(
+ 'volume type unset --encryption-type ' + self.NAME)
+ self.assertEqual('', raw_output)
+ opts = self.get_opts(['encryption'])
+ raw_output = self.openstack(
+ 'volume type show --encryption-type ' + self.NAME + opts)
+ self.assertEqual('\n', raw_output)
+ # test delete encryption type
+ raw_output = self.openstack('volume type delete ' + encryption_type)
+ self.assertEqual('', raw_output)
diff --git a/openstackclient/tests/unit/volume/v1/fakes.py b/openstackclient/tests/unit/volume/v1/fakes.py
index 78a8227e..fff5181d 100644
--- a/openstackclient/tests/unit/volume/v1/fakes.py
+++ b/openstackclient/tests/unit/volume/v1/fakes.py
@@ -364,6 +364,9 @@ class FakeVolumev1Client(object):
self.qos_specs.resource_class = fakes.FakeResource(None, {})
self.volume_types = mock.Mock()
self.volume_types.resource_class = fakes.FakeResource(None, {})
+ self.volume_encryption_types = mock.Mock()
+ self.volume_encryption_types.resource_class = (
+ fakes.FakeResource(None, {}))
self.transfers = mock.Mock()
self.transfers.resource_class = fakes.FakeResource(None, {})
self.volume_snapshots = mock.Mock()
@@ -470,6 +473,34 @@ class FakeType(object):
return mock.Mock(side_effect=types)
+ @staticmethod
+ def create_one_encryption_type(attrs=None):
+ """Create a fake encryption type.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object with volume_type_id etc.
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ encryption_info = {
+ "volume_type_id": 'type-id-' + uuid.uuid4().hex,
+ 'provider': 'LuksEncryptor',
+ 'cipher': None,
+ 'key_size': None,
+ 'control_location': 'front-end',
+ }
+
+ # Overwrite default attributes.
+ encryption_info.update(attrs)
+
+ encryption_type = fakes.FakeResource(
+ info=copy.deepcopy(encryption_info),
+ loaded=True)
+ return encryption_type
+
class FakeSnapshot(object):
"""Fake one or more snapshot."""
diff --git a/openstackclient/tests/unit/volume/v1/test_type.py b/openstackclient/tests/unit/volume/v1/test_type.py
index 81ad8301..dcdd3d56 100644
--- a/openstackclient/tests/unit/volume/v1/test_type.py
+++ b/openstackclient/tests/unit/volume/v1/test_type.py
@@ -31,6 +31,10 @@ class TestType(volume_fakes.TestVolumev1):
self.types_mock = self.app.client_manager.volume.volume_types
self.types_mock.reset_mock()
+ self.encryption_types_mock = (
+ self.app.client_manager.volume.volume_encryption_types)
+ self.encryption_types_mock.reset_mock()
+
class TestTypeCreate(TestType):
@@ -75,6 +79,67 @@ class TestTypeCreate(TestType):
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
+ def test_type_create_with_encryption(self):
+ encryption_info = {
+ 'provider': 'LuksEncryptor',
+ 'cipher': 'aes-xts-plain64',
+ 'key_size': '128',
+ 'control_location': 'front-end',
+ }
+ encryption_type = volume_fakes.FakeType.create_one_encryption_type(
+ attrs=encryption_info
+ )
+ self.new_volume_type = volume_fakes.FakeType.create_one_type(
+ attrs={'encryption': encryption_info})
+ self.types_mock.create.return_value = self.new_volume_type
+ self.encryption_types_mock.create.return_value = encryption_type
+ encryption_columns = (
+ 'description',
+ 'encryption',
+ 'id',
+ 'is_public',
+ 'name',
+ )
+ encryption_data = (
+ self.new_volume_type.description,
+ utils.format_dict(encryption_info),
+ self.new_volume_type.id,
+ True,
+ self.new_volume_type.name,
+ )
+ arglist = [
+ '--encryption-provider', 'LuksEncryptor',
+ '--encryption-cipher', 'aes-xts-plain64',
+ '--encryption-key-size', '128',
+ '--encryption-control-location', 'front-end',
+ self.new_volume_type.name,
+ ]
+ verifylist = [
+ ('encryption_provider', 'LuksEncryptor'),
+ ('encryption_cipher', 'aes-xts-plain64'),
+ ('encryption_key_size', 128),
+ ('encryption_control_location', 'front-end'),
+ ('name', self.new_volume_type.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.types_mock.create.assert_called_with(
+ self.new_volume_type.name,
+ )
+ body = {
+ 'provider': 'LuksEncryptor',
+ 'cipher': 'aes-xts-plain64',
+ 'key_size': 128,
+ 'control_location': 'front-end',
+ }
+ self.encryption_types_mock.create.assert_called_with(
+ self.new_volume_type,
+ body,
+ )
+ self.assertEqual(encryption_columns, columns)
+ self.assertEqual(encryption_data, data)
+
class TestTypeDelete(TestType):
@@ -156,17 +221,17 @@ class TestTypeList(TestType):
volume_types = volume_fakes.FakeType.create_types()
- columns = (
+ columns = [
"ID",
"Name",
"Is Public",
- )
- columns_long = (
+ ]
+ columns_long = [
"ID",
"Name",
"Is Public",
"Properties"
- )
+ ]
data = []
for t in volume_types:
@@ -188,6 +253,8 @@ class TestTypeList(TestType):
super(TestTypeList, self).setUp()
self.types_mock.list.return_value = self.volume_types
+ self.encryption_types_mock.create.return_value = None
+ self.encryption_types_mock.update.return_value = None
# get the command to test
self.cmd = volume_type.ListVolumeType(self.app, None)
@@ -195,6 +262,7 @@ class TestTypeList(TestType):
arglist = []
verifylist = [
("long", False),
+ ("encryption_type", False),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@@ -217,6 +285,47 @@ class TestTypeList(TestType):
self.assertEqual(self.columns_long, columns)
self.assertEqual(self.data_long, list(data))
+ def test_type_list_with_encryption(self):
+ encryption_type = volume_fakes.FakeType.create_one_encryption_type(
+ attrs={'volume_type_id': self.volume_types[0].id})
+ encryption_info = {
+ 'provider': 'LuksEncryptor',
+ 'cipher': None,
+ 'key_size': None,
+ 'control_location': 'front-end',
+ }
+ encryption_columns = self.columns + [
+ "Encryption",
+ ]
+ encryption_data = []
+ encryption_data.append((
+ self.volume_types[0].id,
+ self.volume_types[0].name,
+ self.volume_types[0].is_public,
+ utils.format_dict(encryption_info),
+ ))
+ encryption_data.append((
+ self.volume_types[1].id,
+ self.volume_types[1].name,
+ self.volume_types[1].is_public,
+ '-',
+ ))
+
+ self.encryption_types_mock.list.return_value = [encryption_type]
+ arglist = [
+ "--encryption-type",
+ ]
+ verifylist = [
+ ("encryption_type", True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.encryption_types_mock.list.assert_called_once_with()
+ self.types_mock.list.assert_called_once_with()
+ self.assertEqual(encryption_columns, columns)
+ self.assertEqual(encryption_data, list(data))
+
class TestTypeSet(TestType):
@@ -260,6 +369,60 @@ class TestTypeSet(TestType):
{'myprop': 'myvalue'})
self.assertIsNone(result)
+ def test_type_set_new_encryption(self):
+ arglist = [
+ '--encryption-provider', 'LuksEncryptor',
+ '--encryption-cipher', 'aes-xts-plain64',
+ '--encryption-key-size', '128',
+ '--encryption-control-location', 'front-end',
+ self.volume_type.id,
+ ]
+ verifylist = [
+ ('encryption_provider', 'LuksEncryptor'),
+ ('encryption_cipher', 'aes-xts-plain64'),
+ ('encryption_key_size', 128),
+ ('encryption_control_location', 'front-end'),
+ ('volume_type', self.volume_type.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ body = {
+ 'provider': 'LuksEncryptor',
+ 'cipher': 'aes-xts-plain64',
+ 'key_size': 128,
+ 'control_location': 'front-end',
+ }
+ self.encryption_types_mock.create.assert_called_with(
+ self.volume_type,
+ body,
+ )
+ self.assertIsNone(result)
+
+ def test_type_set_new_encryption_without_provider(self):
+ arglist = [
+ '--encryption-cipher', 'aes-xts-plain64',
+ '--encryption-key-size', '128',
+ '--encryption-control-location', 'front-end',
+ self.volume_type.id,
+ ]
+ verifylist = [
+ ('encryption_cipher', 'aes-xts-plain64'),
+ ('encryption_key_size', 128),
+ ('encryption_control_location', 'front-end'),
+ ('volume_type', self.volume_type.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual("Command Failed: One or more of"
+ " the operations failed",
+ str(e))
+ self.encryption_types_mock.create.assert_not_called()
+ self.encryption_types_mock.update.assert_not_called()
+
class TestTypeShow(TestType):
@@ -293,7 +456,8 @@ class TestTypeShow(TestType):
self.volume_type.id
]
verifylist = [
- ("volume_type", self.volume_type.id)
+ ("volume_type", self.volume_type.id),
+ ("encryption_type", False),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@@ -303,6 +467,50 @@ class TestTypeShow(TestType):
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
+ def test_type_show_with_encryption(self):
+ encryption_type = volume_fakes.FakeType.create_one_encryption_type()
+ encryption_info = {
+ 'provider': 'LuksEncryptor',
+ 'cipher': None,
+ 'key_size': None,
+ 'control_location': 'front-end',
+ }
+ self.volume_type = volume_fakes.FakeType.create_one_type(
+ attrs={'encryption': encryption_info})
+ self.types_mock.get.return_value = self.volume_type
+ self.encryption_types_mock.get.return_value = encryption_type
+ encryption_columns = (
+ 'description',
+ 'encryption',
+ 'id',
+ 'is_public',
+ 'name',
+ 'properties',
+ )
+ encryption_data = (
+ self.volume_type.description,
+ utils.format_dict(encryption_info),
+ self.volume_type.id,
+ True,
+ self.volume_type.name,
+ utils.format_dict(self.volume_type.extra_specs)
+ )
+ arglist = [
+ '--encryption-type',
+ self.volume_type.id
+ ]
+ verifylist = [
+ ('encryption_type', True),
+ ("volume_type", self.volume_type.id)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.types_mock.get.assert_called_with(self.volume_type.id)
+ self.encryption_types_mock.get.assert_called_with(self.volume_type.id)
+ self.assertEqual(encryption_columns, columns)
+ self.assertEqual(encryption_data, data)
+
class TestTypeUnset(TestType):
@@ -317,13 +525,14 @@ class TestTypeUnset(TestType):
# Get the command object to test
self.cmd = volume_type.UnsetVolumeType(self.app, None)
- def test_type_unset(self):
+ def test_type_unset_property(self):
arglist = [
'--property', 'property',
'--property', 'multi_property',
self.volume_type.id,
]
verifylist = [
+ ('encryption_type', False),
('property', ['property', 'multi_property']),
('volume_type', self.volume_type.id),
]
@@ -333,6 +542,7 @@ class TestTypeUnset(TestType):
result = self.cmd.take_action(parsed_args)
self.volume_type.unset_keys.assert_called_once_with(
['property', 'multi_property'])
+ self.encryption_types_mock.delete.assert_not_called()
self.assertIsNone(result)
def test_type_unset_failed_with_missing_volume_type_argument(self):
@@ -362,3 +572,18 @@ class TestTypeUnset(TestType):
result = self.cmd.take_action(parsed_args)
self.assertIsNone(result)
+
+ def test_type_unset_encryption_type(self):
+ arglist = [
+ '--encryption-type',
+ self.volume_type.id,
+ ]
+ verifylist = [
+ ('encryption_type', True),
+ ('volume_type', self.volume_type.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.encryption_types_mock.delete.assert_called_with(self.volume_type)
+ self.assertIsNone(result)
diff --git a/openstackclient/tests/unit/volume/v2/fakes.py b/openstackclient/tests/unit/volume/v2/fakes.py
index a6676403..d54faec7 100644
--- a/openstackclient/tests/unit/volume/v2/fakes.py
+++ b/openstackclient/tests/unit/volume/v2/fakes.py
@@ -208,6 +208,9 @@ class FakeVolumeClient(object):
self.volume_types.resource_class = fakes.FakeResource(None, {})
self.volume_type_access = mock.Mock()
self.volume_type_access.resource_class = fakes.FakeResource(None, {})
+ self.volume_encryption_types = mock.Mock()
+ self.volume_encryption_types.resource_class = (
+ fakes.FakeResource(None, {}))
self.restores = mock.Mock()
self.restores.resource_class = fakes.FakeResource(None, {})
self.qos_specs = mock.Mock()
@@ -923,3 +926,31 @@ class FakeType(object):
types = FakeType.create_types(count)
return mock.Mock(side_effect=types)
+
+ @staticmethod
+ def create_one_encryption_type(attrs=None):
+ """Create a fake encryption type.
+
+ :param Dictionary attrs:
+ A dictionary with all attributes
+ :return:
+ A FakeResource object with volume_type_id etc.
+ """
+ attrs = attrs or {}
+
+ # Set default attributes.
+ encryption_info = {
+ "volume_type_id": 'type-id-' + uuid.uuid4().hex,
+ 'provider': 'LuksEncryptor',
+ 'cipher': None,
+ 'key_size': None,
+ 'control_location': 'front-end',
+ }
+
+ # Overwrite default attributes.
+ encryption_info.update(attrs)
+
+ encryption_type = fakes.FakeResource(
+ info=copy.deepcopy(encryption_info),
+ loaded=True)
+ return encryption_type
diff --git a/openstackclient/tests/unit/volume/v2/test_type.py b/openstackclient/tests/unit/volume/v2/test_type.py
index cec01bd8..4023d55b 100644
--- a/openstackclient/tests/unit/volume/v2/test_type.py
+++ b/openstackclient/tests/unit/volume/v2/test_type.py
@@ -36,6 +36,10 @@ class TestType(volume_fakes.TestVolume):
self.app.client_manager.volume.volume_type_access)
self.types_access_mock.reset_mock()
+ self.encryption_types_mock = (
+ self.app.client_manager.volume.volume_encryption_types)
+ self.encryption_types_mock.reset_mock()
+
self.projects_mock = self.app.client_manager.identity.projects
self.projects_mock.reset_mock()
@@ -131,6 +135,68 @@ class TestTypeCreate(TestType):
self.cmd.take_action,
parsed_args)
+ def test_type_create_with_encryption(self):
+ encryption_info = {
+ 'provider': 'LuksEncryptor',
+ 'cipher': 'aes-xts-plain64',
+ 'key_size': '128',
+ 'control_location': 'front-end',
+ }
+ encryption_type = volume_fakes.FakeType.create_one_encryption_type(
+ attrs=encryption_info
+ )
+ self.new_volume_type = volume_fakes.FakeType.create_one_type(
+ attrs={'encryption': encryption_info})
+ self.types_mock.create.return_value = self.new_volume_type
+ self.encryption_types_mock.create.return_value = encryption_type
+ encryption_columns = (
+ 'description',
+ 'encryption',
+ 'id',
+ 'is_public',
+ 'name',
+ )
+ encryption_data = (
+ self.new_volume_type.description,
+ utils.format_dict(encryption_info),
+ self.new_volume_type.id,
+ True,
+ self.new_volume_type.name,
+ )
+ arglist = [
+ '--encryption-provider', 'LuksEncryptor',
+ '--encryption-cipher', 'aes-xts-plain64',
+ '--encryption-key-size', '128',
+ '--encryption-control-location', 'front-end',
+ self.new_volume_type.name,
+ ]
+ verifylist = [
+ ('encryption_provider', 'LuksEncryptor'),
+ ('encryption_cipher', 'aes-xts-plain64'),
+ ('encryption_key_size', 128),
+ ('encryption_control_location', 'front-end'),
+ ('name', self.new_volume_type.name),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.types_mock.create.assert_called_with(
+ self.new_volume_type.name,
+ description=None,
+ )
+ body = {
+ 'provider': 'LuksEncryptor',
+ 'cipher': 'aes-xts-plain64',
+ 'key_size': 128,
+ 'control_location': 'front-end',
+ }
+ self.encryption_types_mock.create.assert_called_with(
+ self.new_volume_type,
+ body,
+ )
+ self.assertEqual(encryption_columns, columns)
+ self.assertEqual(encryption_data, data)
+
class TestTypeDelete(TestType):
@@ -305,6 +371,7 @@ class TestTypeList(TestType):
"--default",
]
verifylist = [
+ ("encryption_type", False),
("long", False),
("private", False),
("public", False),
@@ -317,6 +384,47 @@ class TestTypeList(TestType):
self.assertEqual(self.columns, columns)
self.assertEqual(self.data_with_default_type, list(data))
+ def test_type_list_with_encryption(self):
+ encryption_type = volume_fakes.FakeType.create_one_encryption_type(
+ attrs={'volume_type_id': self.volume_types[0].id})
+ encryption_info = {
+ 'provider': 'LuksEncryptor',
+ 'cipher': None,
+ 'key_size': None,
+ 'control_location': 'front-end',
+ }
+ encryption_columns = self.columns + [
+ "Encryption",
+ ]
+ encryption_data = []
+ encryption_data.append((
+ self.volume_types[0].id,
+ self.volume_types[0].name,
+ self.volume_types[0].is_public,
+ utils.format_dict(encryption_info),
+ ))
+ encryption_data.append((
+ self.volume_types[1].id,
+ self.volume_types[1].name,
+ self.volume_types[1].is_public,
+ '-',
+ ))
+
+ self.encryption_types_mock.list.return_value = [encryption_type]
+ arglist = [
+ "--encryption-type",
+ ]
+ verifylist = [
+ ("encryption_type", True),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.encryption_types_mock.list.assert_called_once_with()
+ self.types_mock.list.assert_called_once_with(is_public=None)
+ self.assertEqual(encryption_columns, columns)
+ self.assertEqual(encryption_data, list(data))
+
class TestTypeSet(TestType):
@@ -331,6 +439,8 @@ class TestTypeSet(TestType):
# Return a project
self.projects_mock.get.return_value = self.project
+ self.encryption_types_mock.create.return_value = None
+ self.encryption_types_mock.update.return_value = None
# Get the command object to test
self.cmd = volume_type.SetVolumeType(self.app, None)
@@ -454,6 +564,107 @@ class TestTypeSet(TestType):
self.project.id,
)
+ def test_type_set_new_encryption(self):
+ self.encryption_types_mock.update.side_effect = (
+ exceptions.NotFound('NotFound'))
+ arglist = [
+ '--encryption-provider', 'LuksEncryptor',
+ '--encryption-cipher', 'aes-xts-plain64',
+ '--encryption-key-size', '128',
+ '--encryption-control-location', 'front-end',
+ self.volume_type.id,
+ ]
+ verifylist = [
+ ('encryption_provider', 'LuksEncryptor'),
+ ('encryption_cipher', 'aes-xts-plain64'),
+ ('encryption_key_size', 128),
+ ('encryption_control_location', 'front-end'),
+ ('volume_type', self.volume_type.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ body = {
+ 'provider': 'LuksEncryptor',
+ 'cipher': 'aes-xts-plain64',
+ 'key_size': 128,
+ 'control_location': 'front-end',
+ }
+ self.encryption_types_mock.update.assert_called_with(
+ self.volume_type,
+ body,
+ )
+ self.encryption_types_mock.create.assert_called_with(
+ self.volume_type,
+ body,
+ )
+ self.assertIsNone(result)
+
+ @mock.patch.object(utils, 'find_resource')
+ def test_type_set_existing_encryption(self, mock_find):
+ mock_find.side_effect = [self.volume_type,
+ "existing_encryption_type"]
+ arglist = [
+ '--encryption-provider', 'LuksEncryptor',
+ '--encryption-cipher', 'aes-xts-plain64',
+ '--encryption-control-location', 'front-end',
+ self.volume_type.id,
+ ]
+ verifylist = [
+ ('encryption_provider', 'LuksEncryptor'),
+ ('encryption_cipher', 'aes-xts-plain64'),
+ ('encryption_control_location', 'front-end'),
+ ('volume_type', self.volume_type.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ body = {
+ 'provider': 'LuksEncryptor',
+ 'cipher': 'aes-xts-plain64',
+ 'control_location': 'front-end',
+ }
+ self.encryption_types_mock.update.assert_called_with(
+ self.volume_type,
+ body,
+ )
+ self.encryption_types_mock.create.assert_not_called()
+ self.assertIsNone(result)
+
+ def test_type_set_new_encryption_without_provider(self):
+ self.encryption_types_mock.update.side_effect = (
+ exceptions.NotFound('NotFound'))
+ arglist = [
+ '--encryption-cipher', 'aes-xts-plain64',
+ '--encryption-key-size', '128',
+ '--encryption-control-location', 'front-end',
+ self.volume_type.id,
+ ]
+ verifylist = [
+ ('encryption_cipher', 'aes-xts-plain64'),
+ ('encryption_key_size', 128),
+ ('encryption_control_location', 'front-end'),
+ ('volume_type', self.volume_type.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ self.fail('CommandError should be raised.')
+ except exceptions.CommandError as e:
+ self.assertEqual("Command Failed: One or more of"
+ " the operations failed",
+ str(e))
+ body = {
+ 'cipher': 'aes-xts-plain64',
+ 'key_size': 128,
+ 'control_location': 'front-end',
+ }
+ self.encryption_types_mock.update.assert_called_with(
+ self.volume_type,
+ body,
+ )
+ self.encryption_types_mock.create.assert_not_called()
+
class TestTypeShow(TestType):
@@ -489,6 +700,7 @@ class TestTypeShow(TestType):
self.volume_type.id
]
verifylist = [
+ ("encryption_type", False),
("volume_type", self.volume_type.id)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@@ -564,6 +776,52 @@ class TestTypeShow(TestType):
)
self.assertEqual(private_type_data, data)
+ def test_type_show_with_encryption(self):
+ encryption_type = volume_fakes.FakeType.create_one_encryption_type()
+ encryption_info = {
+ 'provider': 'LuksEncryptor',
+ 'cipher': None,
+ 'key_size': None,
+ 'control_location': 'front-end',
+ }
+ self.volume_type = volume_fakes.FakeType.create_one_type(
+ attrs={'encryption': encryption_info})
+ self.types_mock.get.return_value = self.volume_type
+ self.encryption_types_mock.get.return_value = encryption_type
+ encryption_columns = (
+ 'access_project_ids',
+ 'description',
+ 'encryption',
+ 'id',
+ 'is_public',
+ 'name',
+ 'properties',
+ )
+ encryption_data = (
+ None,
+ self.volume_type.description,
+ utils.format_dict(encryption_info),
+ self.volume_type.id,
+ True,
+ self.volume_type.name,
+ utils.format_dict(self.volume_type.extra_specs)
+ )
+ arglist = [
+ '--encryption-type',
+ self.volume_type.id
+ ]
+ verifylist = [
+ ('encryption_type', True),
+ ("volume_type", self.volume_type.id)
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+ self.types_mock.get.assert_called_with(self.volume_type.id)
+ self.encryption_types_mock.get.assert_called_with(self.volume_type.id)
+ self.assertEqual(encryption_columns, columns)
+ self.assertEqual(encryption_data, data)
+
class TestTypeUnset(TestType):
@@ -625,6 +883,7 @@ class TestTypeUnset(TestType):
self.volume_type.id,
]
verifylist = [
+ ('encryption_type', False),
('project', ''),
('volume_type', self.volume_type.id),
]
@@ -633,7 +892,7 @@ class TestTypeUnset(TestType):
result = self.cmd.take_action(parsed_args)
self.assertIsNone(result)
-
+ self.encryption_types_mock.delete.assert_not_called()
self.assertFalse(self.types_access_mock.remove_project_access.called)
def test_type_unset_failed_with_missing_volume_type_argument(self):
@@ -649,3 +908,18 @@ class TestTypeUnset(TestType):
self.cmd,
arglist,
verifylist)
+
+ def test_type_unset_encryption_type(self):
+ arglist = [
+ '--encryption-type',
+ self.volume_type.id,
+ ]
+ verifylist = [
+ ('encryption_type', True),
+ ('volume_type', self.volume_type.id),
+ ]
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ result = self.cmd.take_action(parsed_args)
+ self.encryption_types_mock.delete.assert_called_with(self.volume_type)
+ self.assertIsNone(result)
diff --git a/openstackclient/volume/v1/volume_type.py b/openstackclient/volume/v1/volume_type.py
index 8adce322..f9baa5be 100644
--- a/openstackclient/volume/v1/volume_type.py
+++ b/openstackclient/volume/v1/volume_type.py
@@ -29,6 +29,26 @@ from openstackclient.i18n import _
LOG = logging.getLogger(__name__)
+def _create_encryption_type(volume_client, volume_type, parsed_args):
+ if not parsed_args.encryption_provider:
+ msg = _("'--encryption-provider' should be specified while "
+ "creating a new encryption type")
+ raise exceptions.CommandError(msg)
+ # set the default of control location while creating
+ control_location = 'front-end'
+ if parsed_args.encryption_control_location:
+ control_location = parsed_args.encryption_control_location
+ body = {
+ 'provider': parsed_args.encryption_provider,
+ 'cipher': parsed_args.encryption_cipher,
+ 'key_size': parsed_args.encryption_key_size,
+ 'control_location': control_location
+ }
+ encryption = volume_client.volume_encryption_types.create(
+ volume_type, body)
+ return encryption
+
+
class CreateVolumeType(command.ShowOne):
_description = _("Create new volume type")
@@ -46,6 +66,42 @@ class CreateVolumeType(command.ShowOne):
help=_('Set a property on this volume type '
'(repeat option to set multiple properties)'),
)
+ # TODO(Huanxuan Ao): Add choices for each "--encryption-*" option.
+ parser.add_argument(
+ '--encryption-provider',
+ metavar='<provider>',
+ help=_('Set the class that provides encryption support for '
+ 'this volume type (e.g "LuksEncryptor") (admin only) '
+ '(This option is required when setting encryption type '
+ 'of a volume. Consider using other encryption options '
+ 'such as: "--encryption-cipher", "--encryption-key-size" '
+ 'and "--encryption-control-location")'),
+ )
+ parser.add_argument(
+ '--encryption-cipher',
+ metavar='<cipher>',
+ help=_('Set the encryption algorithm or mode for this '
+ 'volume type (e.g "aes-xts-plain64") (admin only)'),
+ )
+ parser.add_argument(
+ '--encryption-key-size',
+ metavar='<key-size>',
+ type=int,
+ help=_('Set the size of the encryption key of this '
+ 'volume type (e.g "128" or "256") (admin only)'),
+ )
+ parser.add_argument(
+ '--encryption-control-location',
+ metavar='<control-location>',
+ choices=['front-end', 'back-end'],
+ help=_('Set the notional service where the encryption is '
+ 'performed ("front-end" or "back-end") (admin only) '
+ '(The default value for this option is "front-end" '
+ 'when setting encryption type of a volume. Consider '
+ 'using other encryption options such as: '
+ '"--encryption-cipher", "--encryption-key-size" and '
+ '"--encryption-provider")'),
+ )
return parser
def take_action(self, parsed_args):
@@ -55,6 +111,21 @@ class CreateVolumeType(command.ShowOne):
if parsed_args.property:
result = volume_type.set_keys(parsed_args.property)
volume_type._info.update({'properties': utils.format_dict(result)})
+ if (parsed_args.encryption_provider or
+ parsed_args.encryption_cipher or
+ parsed_args.encryption_key_size or
+ parsed_args.encryption_control_location):
+ try:
+ # create new encryption
+ encryption = _create_encryption_type(
+ volume_client, volume_type, parsed_args)
+ except Exception as e:
+ LOG.error(_("Failed to set encryption information for this "
+ "volume type: %s"), e)
+ # add encryption info in result
+ encryption._info.pop("volume_type_id", None)
+ volume_type._info.update(
+ {'encryption': utils.format_dict(encryption._info)})
volume_type._info.pop("os-volume-type-access:is_public", None)
return zip(*sorted(six.iteritems(volume_type._info)))
@@ -107,20 +178,58 @@ class ListVolumeType(command.Lister):
default=False,
help=_('List additional fields in output')
)
+ parser.add_argument(
+ "--encryption-type",
+ action="store_true",
+ help=_("Display encryption information for each volume type "
+ "(admin only)"),
+ )
return parser
def take_action(self, parsed_args):
+ volume_client = self.app.client_manager.volume
if parsed_args.long:
- columns = ('ID', 'Name', 'Is Public', 'Extra Specs')
- column_headers = ('ID', 'Name', 'Is Public', 'Properties')
+ columns = ['ID', 'Name', 'Is Public', 'Extra Specs']
+ column_headers = ['ID', 'Name', 'Is Public', 'Properties']
else:
- columns = ('ID', 'Name', 'Is Public')
- column_headers = columns
- data = self.app.client_manager.volume.volume_types.list()
+ columns = ['ID', 'Name', 'Is Public']
+ column_headers = ['ID', 'Name', 'Is Public']
+ data = volume_client.volume_types.list()
+
+ def _format_encryption_info(type_id, encryption_data=None):
+ encryption_data = encryption
+ encryption_info = '-'
+ if type_id in encryption_data.keys():
+ encryption_info = encryption_data[type_id]
+ return encryption_info
+
+ if parsed_args.encryption_type:
+ encryption = {}
+ for d in volume_client.volume_encryption_types.list():
+ volume_type_id = d._info['volume_type_id']
+ # remove some redundant information
+ del_key = [
+ 'deleted',
+ 'created_at',
+ 'updated_at',
+ 'deleted_at',
+ 'volume_type_id'
+ ]
+ for key in del_key:
+ d._info.pop(key, None)
+ # save the encryption information with their volume type ID
+ encryption[volume_type_id] = utils.format_dict(d._info)
+ # We need to get volume type ID, then show encryption
+ # information according to the ID, so use "id" to keep
+ # difference to the real "ID" column.
+ columns += ['id']
+ column_headers += ['Encryption']
+
return (column_headers,
(utils.get_item_properties(
s, columns,
- formatters={'Extra Specs': utils.format_dict},
+ formatters={'Extra Specs': utils.format_dict,
+ 'id': _format_encryption_info},
) for s in data))
@@ -141,6 +250,42 @@ class SetVolumeType(command.Command):
help=_('Set a property on this volume type '
'(repeat option to set multiple properties)'),
)
+ # TODO(Huanxuan Ao): Add choices for each "--encryption-*" option.
+ parser.add_argument(
+ '--encryption-provider',
+ metavar='<provider>',
+ help=_('Set the class that provides encryption support for '
+ 'this volume type (e.g "LuksEncryptor") (admin only) '
+ '(This option is required when setting encryption type '
+ 'of a volume. Consider using other encryption options '
+ 'such as: "--encryption-cipher", "--encryption-key-size" '
+ 'and "--encryption-control-location")'),
+ )
+ parser.add_argument(
+ '--encryption-cipher',
+ metavar='<cipher>',
+ help=_('Set the encryption algorithm or mode for this '
+ 'volume type (e.g "aes-xts-plain64") (admin only)'),
+ )
+ parser.add_argument(
+ '--encryption-key-size',
+ metavar='<key-size>',
+ type=int,
+ help=_('Set the size of the encryption key of this '
+ 'volume type (e.g "128" or "256") (admin only)'),
+ )
+ parser.add_argument(
+ '--encryption-control-location',
+ metavar='<control-location>',
+ choices=['front-end', 'back-end'],
+ help=_('Set the notional service where the encryption is '
+ 'performed ("front-end" or "back-end") (admin only) '
+ '(The default value for this option is "front-end" '
+ 'when setting encryption type of a volume. Consider '
+ 'using other encryption options such as: '
+ '"--encryption-cipher", "--encryption-key-size" and '
+ '"--encryption-provider")'),
+ )
return parser
def take_action(self, parsed_args):
@@ -148,8 +293,29 @@ class SetVolumeType(command.Command):
volume_type = utils.find_resource(
volume_client.volume_types, parsed_args.volume_type)
+ result = 0
if parsed_args.property:
- volume_type.set_keys(parsed_args.property)
+ try:
+ volume_type.set_keys(parsed_args.property)
+ except Exception as e:
+ LOG.error(_("Failed to set volume type property: %s"), e)
+ result += 1
+
+ if (parsed_args.encryption_provider or
+ parsed_args.encryption_cipher or
+ parsed_args.encryption_key_size or
+ parsed_args.encryption_control_location):
+ try:
+ _create_encryption_type(
+ volume_client, volume_type, parsed_args)
+ except Exception as e:
+ LOG.error(_("Failed to set encryption information for this "
+ "volume type: %s"), e)
+ result += 1
+
+ if result > 0:
+ raise exceptions.CommandError(_("Command Failed: One or more of"
+ " the operations failed"))
class ShowVolumeType(command.ShowOne):
@@ -162,6 +328,12 @@ class ShowVolumeType(command.ShowOne):
metavar="<volume-type>",
help=_("Volume type to display (name or ID)")
)
+ parser.add_argument(
+ "--encryption-type",
+ action="store_true",
+ help=_("Display encryption information of this volume type "
+ "(admin only)"),
+ )
return parser
def take_action(self, parsed_args):
@@ -170,6 +342,17 @@ class ShowVolumeType(command.ShowOne):
volume_client.volume_types, parsed_args.volume_type)
properties = utils.format_dict(volume_type._info.pop('extra_specs'))
volume_type._info.update({'properties': properties})
+ if parsed_args.encryption_type:
+ # show encryption type information for this volume type
+ try:
+ encryption = volume_client.volume_encryption_types.get(
+ volume_type.id)
+ encryption._info.pop("volume_type_id", None)
+ volume_type._info.update(
+ {'encryption': utils.format_dict(encryption._info)})
+ except Exception as e:
+ LOG.error(_("Failed to display the encryption information "
+ "of this volume type: %s"), e)
volume_type._info.pop("os-volume-type-access:is_public", None)
return zip(*sorted(six.iteritems(volume_type._info)))
@@ -191,6 +374,12 @@ class UnsetVolumeType(command.Command):
help=_('Remove a property from this volume type '
'(repeat option to remove multiple properties)'),
)
+ parser.add_argument(
+ "--encryption-type",
+ action="store_true",
+ help=_("Remove the encryption type for this volume type "
+ "(admin oly)"),
+ )
return parser
def take_action(self, parsed_args):
@@ -200,5 +389,21 @@ class UnsetVolumeType(command.Command):
parsed_args.volume_type,
)
+ result = 0
if parsed_args.property:
- volume_type.unset_keys(parsed_args.property)
+ try:
+ volume_type.unset_keys(parsed_args.property)
+ except Exception as e:
+ LOG.error(_("Failed to unset volume type property: %s"), e)
+ result += 1
+ if parsed_args.encryption_type:
+ try:
+ volume_client.volume_encryption_types.delete(volume_type)
+ except Exception as e:
+ LOG.error(_("Failed to remove the encryption type for this "
+ "volume type: %s"), e)
+ result += 1
+
+ if result > 0:
+ raise exceptions.CommandError(_("Command Failed: One or more of"
+ " the operations failed"))
diff --git a/openstackclient/volume/v2/volume_type.py b/openstackclient/volume/v2/volume_type.py
index 46466783..8d2901f2 100644
--- a/openstackclient/volume/v2/volume_type.py
+++ b/openstackclient/volume/v2/volume_type.py
@@ -29,6 +29,44 @@ from openstackclient.identity import common as identity_common
LOG = logging.getLogger(__name__)
+def _create_encryption_type(volume_client, volume_type, parsed_args):
+ if not parsed_args.encryption_provider:
+ msg = _("'--encryption-provider' should be specified while "
+ "creating a new encryption type")
+ raise exceptions.CommandError(msg)
+ # set the default of control location while creating
+ control_location = 'front-end'
+ if parsed_args.encryption_control_location:
+ control_location = parsed_args.encryption_control_location
+ body = {
+ 'provider': parsed_args.encryption_provider,
+ 'cipher': parsed_args.encryption_cipher,
+ 'key_size': parsed_args.encryption_key_size,
+ 'control_location': control_location
+ }
+ encryption = volume_client.volume_encryption_types.create(
+ volume_type, body)
+ return encryption
+
+
+def _set_encryption_type(volume_client, volume_type, parsed_args):
+ # update the existing encryption type
+ body = {}
+ for attr in ['provider', 'cipher', 'key_size', 'control_location']:
+ info = getattr(parsed_args, 'encryption_' + attr, None)
+ if info is not None:
+ body[attr] = info
+ try:
+ volume_client.volume_encryption_types.update(volume_type, body)
+ except Exception as e:
+ if type(e).__name__ == 'NotFound':
+ # create new encryption type
+ LOG.warning(_("No existing encryption type found, creating "
+ "new encryption type for this volume type ..."))
+ _create_encryption_type(
+ volume_client, volume_type, parsed_args)
+
+
class CreateVolumeType(command.ShowOne):
_description = _("Create new volume type")
@@ -70,6 +108,42 @@ class CreateVolumeType(command.ShowOne):
help=_("Allow <project> to access private type (name or ID) "
"(Must be used with --private option)"),
)
+ # TODO(Huanxuan Ao): Add choices for each "--encryption-*" option.
+ parser.add_argument(
+ '--encryption-provider',
+ metavar='<provider>',
+ help=_('Set the class that provides encryption support for '
+ 'this volume type (e.g "LuksEncryptor") (admin only) '
+ '(This option is required when setting encryption type '
+ 'of a volume. Consider using other encryption options '
+ 'such as: "--encryption-cipher", "--encryption-key-size" '
+ 'and "--encryption-control-location")'),
+ )
+ parser.add_argument(
+ '--encryption-cipher',
+ metavar='<cipher>',
+ help=_('Set the encryption algorithm or mode for this '
+ 'volume type (e.g "aes-xts-plain64") (admin only)'),
+ )
+ parser.add_argument(
+ '--encryption-key-size',
+ metavar='<key-size>',
+ type=int,
+ help=_('Set the size of the encryption key of this '
+ 'volume type (e.g "128" or "256") (admin only)'),
+ )
+ parser.add_argument(
+ '--encryption-control-location',
+ metavar='<control-location>',
+ choices=['front-end', 'back-end'],
+ help=_('Set the notional service where the encryption is '
+ 'performed ("front-end" or "back-end") (admin only) '
+ '(The default value for this option is "front-end" '
+ 'when setting encryption type of a volume. Consider '
+ 'using other encryption options such as: '
+ '"--encryption-cipher", "--encryption-key-size" and '
+ '"--encryption-provider")'),
+ )
identity_common.add_project_domain_option_to_parser(parser)
return parser
@@ -110,6 +184,21 @@ class CreateVolumeType(command.ShowOne):
if parsed_args.property:
result = volume_type.set_keys(parsed_args.property)
volume_type._info.update({'properties': utils.format_dict(result)})
+ if (parsed_args.encryption_provider or
+ parsed_args.encryption_cipher or
+ parsed_args.encryption_key_size or
+ parsed_args.encryption_control_location):
+ try:
+ # create new encryption
+ encryption = _create_encryption_type(
+ volume_client, volume_type, parsed_args)
+ except Exception as e:
+ LOG.error(_("Failed to set encryption information for this "
+ "volume type: %s"), e)
+ # add encryption info in result
+ encryption._info.pop("volume_type_id", None)
+ volume_type._info.update(
+ {'encryption': utils.format_dict(encryption._info)})
volume_type._info.pop("os-volume-type-access:is_public", None)
return zip(*sorted(six.iteritems(volume_type._info)))
@@ -179,6 +268,12 @@ class ListVolumeType(command.Lister):
action="store_true",
help=_("List only private types (admin only)")
)
+ parser.add_argument(
+ "--encryption-type",
+ action="store_true",
+ help=_("Display encryption information for each volume type "
+ "(admin only)"),
+ )
return parser
def take_action(self, parsed_args):
@@ -189,7 +284,7 @@ class ListVolumeType(command.Lister):
'ID', 'Name', 'Is Public', 'Description', 'Properties']
else:
columns = ['ID', 'Name', 'Is Public']
- column_headers = columns
+ column_headers = ['ID', 'Name', 'Is Public']
if parsed_args.default:
data = [volume_client.volume_types.default()]
else:
@@ -200,10 +295,41 @@ class ListVolumeType(command.Lister):
is_public = False
data = volume_client.volume_types.list(
is_public=is_public)
+
+ def _format_encryption_info(type_id, encryption_data=None):
+ encryption_data = encryption
+ encryption_info = '-'
+ if type_id in encryption_data.keys():
+ encryption_info = encryption_data[type_id]
+ return encryption_info
+
+ if parsed_args.encryption_type:
+ encryption = {}
+ for d in volume_client.volume_encryption_types.list():
+ volume_type_id = d._info['volume_type_id']
+ # remove some redundant information
+ del_key = [
+ 'deleted',
+ 'created_at',
+ 'updated_at',
+ 'deleted_at',
+ 'volume_type_id'
+ ]
+ for key in del_key:
+ d._info.pop(key, None)
+ # save the encryption information with their volume type ID
+ encryption[volume_type_id] = utils.format_dict(d._info)
+ # We need to get volume type ID, then show encryption
+ # information according to the ID, so use "id" to keep
+ # difference to the real "ID" column.
+ columns += ['id']
+ column_headers += ['Encryption']
+
return (column_headers,
(utils.get_item_properties(
s, columns,
- formatters={'Extra Specs': utils.format_dict},
+ formatters={'Extra Specs': utils.format_dict,
+ 'id': _format_encryption_info},
) for s in data))
@@ -241,7 +367,43 @@ class SetVolumeType(command.Command):
'(admin only)'),
)
identity_common.add_project_domain_option_to_parser(parser)
-
+ # TODO(Huanxuan Ao): Add choices for each "--encryption-*" option.
+ parser.add_argument(
+ '--encryption-provider',
+ metavar='<provider>',
+ help=_('Set the class that provides encryption support for '
+ 'this volume type (e.g "LuksEncryptor") (admin only) '
+ '(This option is required when setting encryption type '
+ 'of a volume for the first time. Consider using other '
+ 'encryption options such as: "--encryption-cipher", '
+ '"--encryption-key-size" and '
+ '"--encryption-control-location")'),
+ )
+ parser.add_argument(
+ '--encryption-cipher',
+ metavar='<cipher>',
+ help=_('Set the encryption algorithm or mode for this '
+ 'volume type (e.g "aes-xts-plain64") (admin only)'),
+ )
+ parser.add_argument(
+ '--encryption-key-size',
+ metavar='<key-size>',
+ type=int,
+ help=_('Set the size of the encryption key of this '
+ 'volume type (e.g "128" or "256") (admin only)'),
+ )
+ parser.add_argument(
+ '--encryption-control-location',
+ metavar='<control-location>',
+ choices=['front-end', 'back-end'],
+ help=_('Set the notional service where the encryption is '
+ 'performed ("front-end" or "back-end") (admin only) '
+ '(The default value for this option is "front-end" '
+ 'when setting encryption type of a volume for the '
+ 'first time. Consider using other encryption options '
+ 'such as: "--encryption-cipher", "--encryption-key-size" '
+ 'and "--encryption-provider")'),
+ )
return parser
def take_action(self, parsed_args):
@@ -290,6 +452,17 @@ class SetVolumeType(command.Command):
"project: %s"), e)
result += 1
+ if (parsed_args.encryption_provider or
+ parsed_args.encryption_cipher or
+ parsed_args.encryption_key_size or
+ parsed_args.encryption_control_location):
+ try:
+ _set_encryption_type(volume_client, volume_type, parsed_args)
+ except Exception as e:
+ LOG.error(_("Failed to set encryption information for this "
+ "volume type: %s"), e)
+ result += 1
+
if result > 0:
raise exceptions.CommandError(_("Command Failed: One or more of"
" the operations failed"))
@@ -305,6 +478,12 @@ class ShowVolumeType(command.ShowOne):
metavar="<volume-type>",
help=_("Volume type to display (name or ID)")
)
+ parser.add_argument(
+ "--encryption-type",
+ action="store_true",
+ help=_("Display encryption information of this volume type "
+ "(admin only)"),
+ )
return parser
def take_action(self, parsed_args):
@@ -329,6 +508,17 @@ class ShowVolumeType(command.ShowOne):
'%(type)s: %(e)s')
LOG.error(msg % {'type': volume_type.id, 'e': e})
volume_type._info.update({'access_project_ids': access_project_ids})
+ if parsed_args.encryption_type:
+ # show encryption type information for this volume type
+ try:
+ encryption = volume_client.volume_encryption_types.get(
+ volume_type.id)
+ encryption._info.pop("volume_type_id", None)
+ volume_type._info.update(
+ {'encryption': utils.format_dict(encryption._info)})
+ except Exception as e:
+ LOG.error(_("Failed to display the encryption information "
+ "of this volume type: %s"), e)
volume_type._info.pop("os-volume-type-access:is_public", None)
return zip(*sorted(six.iteritems(volume_type._info)))
@@ -357,7 +547,12 @@ class UnsetVolumeType(command.Command):
' (admin only)'),
)
identity_common.add_project_domain_option_to_parser(parser)
-
+ parser.add_argument(
+ "--encryption-type",
+ action="store_true",
+ help=_("Remove the encryption type for this volume type "
+ "(admin only)"),
+ )
return parser
def take_action(self, parsed_args):
@@ -391,6 +586,13 @@ class UnsetVolumeType(command.Command):
LOG.error(_("Failed to remove volume type access from "
"project: %s"), e)
result += 1
+ if parsed_args.encryption_type:
+ try:
+ volume_client.volume_encryption_types.delete(volume_type)
+ except Exception as e:
+ LOG.error(_("Failed to remove the encryption type for this "
+ "volume type: %s"), e)
+ result += 1
if result > 0:
raise exceptions.CommandError(_("Command Failed: One or more of"