diff options
| author | Dean Troyer <dtroyer@gmail.com> | 2017-09-14 11:06:51 -0500 |
|---|---|---|
| committer | Dean Troyer <dtroyer@gmail.com> | 2017-09-14 11:16:11 -0500 |
| commit | 5309bf5f873aba4e994c9011a893987c40d56eda (patch) | |
| tree | 9e1bf90f6bb27cc6e0a5f9b4ef81ef8431d21599 /openstackclient/tests/functional | |
| parent | 5e5239d5eb635470b0c3bc63c4a6263808c41c9b (diff) | |
| parent | f63bb180036c0f2d096eefaaaf2fbdd60a427343 (diff) | |
| download | python-openstackclient-5309bf5f873aba4e994c9011a893987c40d56eda.tar.gz | |
Merge remote-tracking branch 'origin/master' into f4-merge-branch
Change-Id: Ie6c321e67aa9338334e4649879e60847a5d1eb56
Diffstat (limited to 'openstackclient/tests/functional')
12 files changed, 412 insertions, 446 deletions
diff --git a/openstackclient/tests/functional/base.py b/openstackclient/tests/functional/base.py index 4c88b13e..90bbc24d 100644 --- a/openstackclient/tests/functional/base.py +++ b/openstackclient/tests/functional/base.py @@ -14,10 +14,10 @@ import os import re import shlex import subprocess -import testtools from tempest.lib.cli import output_parser from tempest.lib import exceptions +import testtools COMMON_DIR = os.path.dirname(os.path.abspath(__file__)) diff --git a/openstackclient/tests/functional/compute/v2/test_keypair.py b/openstackclient/tests/functional/compute/v2/test_keypair.py index 1e1a03d6..9a88e66f 100644 --- a/openstackclient/tests/functional/compute/v2/test_keypair.py +++ b/openstackclient/tests/functional/compute/v2/test_keypair.py @@ -13,11 +13,11 @@ import json import tempfile -from openstackclient.tests.functional import base - from tempest.lib.common.utils import data_utils from tempest.lib import exceptions +from openstackclient.tests.functional import base + class KeypairBase(base.TestCase): """Methods for functional tests.""" diff --git a/openstackclient/tests/functional/identity/v3/test_service_provider.py b/openstackclient/tests/functional/identity/v3/test_service_provider.py index e072bc93..32b7a463 100644 --- a/openstackclient/tests/functional/identity/v3/test_service_provider.py +++ b/openstackclient/tests/functional/identity/v3/test_service_provider.py @@ -10,9 +10,10 @@ # License for the specific language governing permissions and limitations # under the License. -from openstackclient.tests.functional.identity.v3 import common from tempest.lib.common.utils import data_utils +from openstackclient.tests.functional.identity.v3 import common + class ServiceProviderTests(common.IdentityTests): # Introduce functional test cases for command 'Service Provider' diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_policy.py b/openstackclient/tests/functional/network/v2/test_network_qos_policy.py index 53c15ecf..27c1b6aa 100644 --- a/openstackclient/tests/functional/network/v2/test_network_qos_policy.py +++ b/openstackclient/tests/functional/network/v2/test_network_qos_policy.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import json import uuid from openstackclient.tests.functional.network.v2 import common @@ -20,34 +21,6 @@ from openstackclient.tests.functional.network.v2 import common class NetworkQosPolicyTests(common.NetworkTests): """Functional tests for QoS policy""" - HEADERS = ['Name'] - FIELDS = ['name'] - - @classmethod - def setUpClass(cls): - common.NetworkTests.setUpClass() - if cls.haz_network: - cls.NAME = uuid.uuid4().hex - - opts = cls.get_opts(cls.FIELDS) - raw_output = cls.openstack( - 'network qos policy create ' + - cls.NAME + - opts - ) - cls.assertOutput(cls.NAME + "\n", raw_output) - - @classmethod - def tearDownClass(cls): - try: - if cls.haz_network: - raw_output = cls.openstack( - 'network qos policy delete ' + - cls.NAME - ) - cls.assertOutput('', raw_output) - finally: - super(NetworkQosPolicyTests, cls).tearDownClass() def setUp(self): super(NetworkQosPolicyTests, self).setUp() @@ -55,33 +28,46 @@ class NetworkQosPolicyTests(common.NetworkTests): if not self.haz_network: self.skipTest("No Network service present") + self.NAME = uuid.uuid4().hex + cmd_output = json.loads(self.openstack( + 'network qos policy create -f json ' + + self.NAME + )) + self.addCleanup(self.openstack, + 'network qos policy delete ' + self.NAME) + self.assertEqual(self.NAME, cmd_output['name']) + + def test_qos_rule_create_delete(self): + # This is to check the output of qos policy delete + policy_name = uuid.uuid4().hex + self.openstack('network qos policy create -f json ' + policy_name) + raw_output = self.openstack( + 'network qos policy delete ' + policy_name) + self.assertEqual('', raw_output) + def test_qos_policy_list(self): - opts = self.get_opts(self.HEADERS) - raw_output = self.openstack('network qos policy list' + opts) - self.assertIn(self.NAME, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos policy list -f json')) + self.assertIn(self.NAME, [p['Name'] for p in cmd_output]) def test_qos_policy_show(self): - opts = self.get_opts(self.FIELDS) - raw_output = self.openstack('network qos policy show ' + self.NAME + - opts) - self.assertEqual(self.NAME + "\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network qos policy show -f json ' + self.NAME)) + self.assertEqual(self.NAME, cmd_output['name']) def test_qos_policy_set(self): self.openstack('network qos policy set --share ' + self.NAME) - opts = self.get_opts(['shared']) - raw_output = self.openstack('network qos policy show ' + self.NAME + - opts) - self.assertEqual("True\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network qos policy show -f json ' + self.NAME)) + self.assertTrue(cmd_output['shared']) def test_qos_policy_default(self): self.openstack('network qos policy set --default ' + self.NAME) - opts = self.get_opts(['is_default']) - raw_output = self.openstack('network qos policy show ' + self.NAME + - opts) - self.assertEqual("True\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network qos policy show -f json ' + self.NAME)) + self.assertTrue(cmd_output['is_default']) self.openstack('network qos policy set --no-default ' + self.NAME) - opts = self.get_opts(['is_default']) - raw_output = self.openstack('network qos policy show ' + self.NAME + - opts) - self.assertEqual("False\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network qos policy show -f json ' + self.NAME)) + self.assertFalse(cmd_output['is_default']) diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py index 8b34422f..770abe94 100644 --- a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py +++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import json import uuid from openstackclient.tests.functional.network.v2 import common @@ -20,51 +21,6 @@ from openstackclient.tests.functional.network.v2 import common class NetworkQosRuleTestsMinimumBandwidth(common.NetworkTests): """Functional tests for QoS minimum bandwidth rule""" - RULE_ID = None - MIN_KBPS = 2800 - MIN_KBPS_MODIFIED = 7500 - DIRECTION = '--egress' - HEADERS = ['ID'] - FIELDS = ['id'] - TYPE = 'minimum-bandwidth' - - @classmethod - def setUpClass(cls): - common.NetworkTests.setUpClass() - if cls.haz_network: - cls.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex - - opts = cls.get_opts(cls.FIELDS) - cls.openstack( - 'network qos policy create ' + - cls.QOS_POLICY_NAME - ) - cls.RULE_ID = cls.openstack( - 'network qos rule create ' + - '--type ' + cls.TYPE + ' ' + - '--min-kbps ' + str(cls.MIN_KBPS) + ' ' + - cls.DIRECTION + ' ' + - cls.QOS_POLICY_NAME + - opts - ) - cls.assertsOutputNotNone(cls.RULE_ID) - - @classmethod - def tearDownClass(cls): - try: - if cls.haz_network: - raw_output = cls.openstack( - 'network qos rule delete ' + - cls.QOS_POLICY_NAME + ' ' + - cls.RULE_ID - ) - cls.openstack( - 'network qos policy delete ' + - cls.QOS_POLICY_NAME - ) - cls.assertOutput('', raw_output) - finally: - super(NetworkQosRuleTestsMinimumBandwidth, cls).tearDownClass() def setUp(self): super(NetworkQosRuleTestsMinimumBandwidth, self).setUp() @@ -72,72 +28,67 @@ class NetworkQosRuleTestsMinimumBandwidth(common.NetworkTests): if not self.haz_network: self.skipTest("No Network service present") + self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex + + self.openstack( + 'network qos policy create ' + + self.QOS_POLICY_NAME + ) + self.addCleanup(self.openstack, + 'network qos policy delete ' + self.QOS_POLICY_NAME) + cmd_output = json.loads(self.openstack( + 'network qos rule create -f json ' + + '--type minimum-bandwidth ' + + '--min-kbps 2800 ' + + '--egress ' + + self.QOS_POLICY_NAME + )) + self.RULE_ID = cmd_output['id'] + self.addCleanup(self.openstack, + 'network qos rule delete ' + + self.QOS_POLICY_NAME + ' ' + + self.RULE_ID) + self.assertTrue(self.RULE_ID) + + def test_qos_rule_create_delete(self): + # This is to check the output of qos rule delete + policy_name = uuid.uuid4().hex + self.openstack('network qos policy create -f json ' + policy_name) + self.addCleanup(self.openstack, + 'network qos policy delete ' + policy_name) + rule = json.loads(self.openstack( + 'network qos rule create -f json ' + + '--type minimum-bandwidth ' + + '--min-kbps 2800 ' + + '--egress ' + policy_name + )) + raw_output = self.openstack( + 'network qos rule delete ' + + policy_name + ' ' + rule['id']) + self.assertEqual('', raw_output) + def test_qos_rule_list(self): - opts = self.get_opts(self.HEADERS) - raw_output = self.openstack('network qos rule list ' - + self.QOS_POLICY_NAME + opts) - self.assertIn(self.RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule list -f json ' + self.QOS_POLICY_NAME)) + self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output]) def test_qos_rule_show(self): - opts = self.get_opts(self.FIELDS) - raw_output = self.openstack('network qos rule show ' + - self.QOS_POLICY_NAME + ' ' + self.RULE_ID + - opts) - self.assertEqual(self.RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID)) + self.assertEqual(self.RULE_ID, cmd_output['id']) def test_qos_rule_set(self): - self.openstack('network qos rule set --min-kbps ' + - str(self.MIN_KBPS_MODIFIED) + ' ' + + self.openstack('network qos rule set --min-kbps 7500 ' + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) - opts = self.get_opts(['min_kbps']) - raw_output = self.openstack('network qos rule show ' + - self.QOS_POLICY_NAME + ' ' + self.RULE_ID + - opts) - self.assertEqual(str(self.MIN_KBPS_MODIFIED) + "\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID)) + self.assertEqual(7500, cmd_output['min_kbps']) class NetworkQosRuleTestsDSCPMarking(common.NetworkTests): """Functional tests for QoS DSCP marking rule""" - RULE_ID = None - QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex - DSCP_MARK = 8 - DSCP_MARK_MODIFIED = 32 - HEADERS = ['ID'] - FIELDS = ['id'] - TYPE = 'dscp-marking' - - @classmethod - def setUpClass(cls): - common.NetworkTests.setUpClass() - if cls.haz_network: - opts = cls.get_opts(cls.FIELDS) - cls.openstack( - 'network qos policy create ' + - cls.QOS_POLICY_NAME - ) - cls.RULE_ID = cls.openstack( - 'network qos rule create ' + - '--type ' + cls.TYPE + ' ' + - '--dscp-mark ' + str(cls.DSCP_MARK) + ' ' + - cls.QOS_POLICY_NAME + - opts - ) - cls.assertsOutputNotNone(cls.RULE_ID) - - @classmethod - def tearDownClass(cls): - try: - if cls.haz_network: - raw_output = cls.openstack( - 'network qos rule delete ' + - cls.QOS_POLICY_NAME + ' ' + - cls.RULE_ID - ) - cls.openstack( - 'network qos policy delete ' + cls.QOS_POLICY_NAME) - cls.assertOutput('', raw_output) - finally: - super(NetworkQosRuleTestsDSCPMarking, cls).tearDownClass() def setUp(self): super(NetworkQosRuleTestsDSCPMarking, self).setUp() @@ -145,78 +96,63 @@ class NetworkQosRuleTestsDSCPMarking(common.NetworkTests): if not self.haz_network: self.skipTest("No Network service present") + self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex + self.openstack( + 'network qos policy create ' + + self.QOS_POLICY_NAME + ) + self.addCleanup(self.openstack, + 'network qos policy delete ' + self.QOS_POLICY_NAME) + cmd_output = json.loads(self.openstack( + 'network qos rule create -f json ' + + '--type dscp-marking ' + + '--dscp-mark 8 ' + + self.QOS_POLICY_NAME + )) + self.RULE_ID = cmd_output['id'] + self.addCleanup(self.openstack, + 'network qos rule delete ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) + self.assertTrue(self.RULE_ID) + + def test_qos_rule_create_delete(self): + # This is to check the output of qos rule delete + policy_name = uuid.uuid4().hex + self.openstack('network qos policy create -f json ' + policy_name) + self.addCleanup(self.openstack, + 'network qos policy delete ' + policy_name) + rule = json.loads(self.openstack( + 'network qos rule create -f json ' + + '--type dscp-marking ' + + '--dscp-mark 8 ' + policy_name + )) + raw_output = self.openstack( + 'network qos rule delete ' + + policy_name + ' ' + rule['id']) + self.assertEqual('', raw_output) + def test_qos_rule_list(self): - opts = self.get_opts(self.HEADERS) - raw_output = self.openstack('network qos rule list ' - + self.QOS_POLICY_NAME + opts) - self.assertIn(self.RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule list -f json ' + self.QOS_POLICY_NAME)) + self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output]) def test_qos_rule_show(self): - opts = self.get_opts(self.FIELDS) - raw_output = self.openstack('network qos rule show ' + - self.QOS_POLICY_NAME + ' ' + self.RULE_ID + - opts) - self.assertEqual(self.RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID)) + self.assertEqual(self.RULE_ID, cmd_output['id']) def test_qos_rule_set(self): - self.openstack('network qos rule set --dscp-mark ' + - str(self.DSCP_MARK_MODIFIED) + ' ' + + self.openstack('network qos rule set --dscp-mark 32 ' + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) - opts = self.get_opts(['dscp_mark']) - raw_output = self.openstack('network qos rule show ' + - self.QOS_POLICY_NAME + ' ' + self.RULE_ID + - opts) - self.assertEqual(str(self.DSCP_MARK_MODIFIED) + "\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID)) + self.assertEqual(32, cmd_output['dscp_mark']) class NetworkQosRuleTestsBandwidthLimit(common.NetworkTests): """Functional tests for QoS bandwidth limit rule""" - RULE_ID = None - QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex - MAX_KBPS = 10000 - MAX_KBPS_MODIFIED = 15000 - MAX_BURST_KBITS = 1400 - MAX_BURST_KBITS_MODIFIED = 1800 - RULE_DIRECTION = 'egress' - RULE_DIRECTION_MODIFIED = 'ingress' - HEADERS = ['ID'] - FIELDS = ['id'] - TYPE = 'bandwidth-limit' - - @classmethod - def setUpClass(cls): - common.NetworkTests.setUpClass() - if cls.haz_network: - opts = cls.get_opts(cls.FIELDS) - cls.openstack( - 'network qos policy create ' + - cls.QOS_POLICY_NAME - ) - cls.RULE_ID = cls.openstack( - 'network qos rule create ' + - '--type ' + cls.TYPE + ' ' + - '--max-kbps ' + str(cls.MAX_KBPS) + ' ' + - '--max-burst-kbits ' + str(cls.MAX_BURST_KBITS) + ' ' + - '--' + cls.RULE_DIRECTION + ' ' + - cls.QOS_POLICY_NAME + - opts - ) - cls.assertsOutputNotNone(cls.RULE_ID) - - @classmethod - def tearDownClass(cls): - try: - if cls.haz_network: - raw_output = cls.openstack( - 'network qos rule delete ' + - cls.QOS_POLICY_NAME + ' ' + - cls.RULE_ID - ) - cls.openstack( - 'network qos policy delete ' + cls.QOS_POLICY_NAME) - cls.assertOutput('', raw_output) - finally: - super(NetworkQosRuleTestsBandwidthLimit, cls).tearDownClass() def setUp(self): super(NetworkQosRuleTestsBandwidthLimit, self).setUp() @@ -224,30 +160,65 @@ class NetworkQosRuleTestsBandwidthLimit(common.NetworkTests): if not self.haz_network: self.skipTest("No Network service present") + self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex + self.openstack( + 'network qos policy create ' + + self.QOS_POLICY_NAME + ) + self.addCleanup(self.openstack, + 'network qos policy delete ' + self.QOS_POLICY_NAME) + cmd_output = json.loads(self.openstack( + 'network qos rule create -f json ' + + '--type bandwidth-limit ' + + '--max-kbps 10000 ' + + '--max-burst-kbits 1400 ' + + '--egress ' + + self.QOS_POLICY_NAME + )) + self.RULE_ID = cmd_output['id'] + self.addCleanup(self.openstack, + 'network qos rule delete ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) + self.assertTrue(self.RULE_ID) + + def test_qos_rule_create_delete(self): + # This is to check the output of qos rule delete + policy_name = uuid.uuid4().hex + self.openstack('network qos policy create -f json ' + policy_name) + self.addCleanup(self.openstack, + 'network qos policy delete ' + policy_name) + rule = json.loads(self.openstack( + 'network qos rule create -f json ' + + '--type bandwidth-limit ' + + '--max-kbps 10000 ' + + '--max-burst-kbits 1400 ' + + '--egress ' + policy_name + )) + raw_output = self.openstack( + 'network qos rule delete ' + + policy_name + ' ' + rule['id']) + self.assertEqual('', raw_output) + def test_qos_rule_list(self): - opts = self.get_opts(self.HEADERS) - raw_output = self.openstack('network qos rule list ' - + self.QOS_POLICY_NAME + opts) - self.assertIn(self.RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule list -f json ' + + self.QOS_POLICY_NAME)) + self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output]) def test_qos_rule_show(self): - opts = self.get_opts(self.FIELDS) - raw_output = self.openstack('network qos rule show ' + - self.QOS_POLICY_NAME + ' ' + self.RULE_ID + - opts) - self.assertEqual(self.RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID)) + self.assertEqual(self.RULE_ID, cmd_output['id']) def test_qos_rule_set(self): - self.openstack('network qos rule set --max-kbps ' + - str(self.MAX_KBPS_MODIFIED) + ' --max-burst-kbits ' + - str(self.MAX_BURST_KBITS_MODIFIED) + ' ' + - '--' + self.RULE_DIRECTION_MODIFIED + ' ' + + self.openstack('network qos rule set --max-kbps 15000 ' + + '--max-burst-kbits 1800 ' + + '--ingress ' + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) - opts = self.get_opts(['direction', 'max_burst_kbps', 'max_kbps']) - raw_output = self.openstack('network qos rule show ' + - self.QOS_POLICY_NAME + ' ' + self.RULE_ID + - opts) - expected = (str(self.RULE_DIRECTION_MODIFIED) + "\n" + - str(self.MAX_BURST_KBITS_MODIFIED) + "\n" + - str(self.MAX_KBPS_MODIFIED) + "\n") - self.assertEqual(expected, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID)) + self.assertEqual(15000, cmd_output['max_kbps']) + self.assertEqual(1800, cmd_output['max_burst_kbps']) + self.assertEqual('ingress', cmd_output['direction']) diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py index d7612936..a6ee3e10 100644 --- a/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py +++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import json + from openstackclient.tests.functional.network.v2 import common @@ -29,6 +31,7 @@ class NetworkQosRuleTypeTests(common.NetworkTests): self.skipTest("No Network service present") def test_qos_rule_type_list(self): - raw_output = self.openstack('network qos rule type list') + cmd_output = json.loads(self.openstack( + 'network qos rule type list -f json')) for rule_type in self.AVAILABLE_RULE_TYPES: - self.assertIn(rule_type, raw_output) + self.assertIn(rule_type, [x['Type'] for x in cmd_output]) diff --git a/openstackclient/tests/functional/network/v2/test_network_rbac.py b/openstackclient/tests/functional/network/v2/test_network_rbac.py index 2206761f..3bbe4f27 100644 --- a/openstackclient/tests/functional/network/v2/test_network_rbac.py +++ b/openstackclient/tests/functional/network/v2/test_network_rbac.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +import json import uuid from openstackclient.tests.functional.network.v2 import common @@ -22,69 +23,51 @@ class NetworkRBACTests(common.NetworkTests): HEADERS = ['ID'] FIELDS = ['id'] - @classmethod - def setUpClass(cls): - common.NetworkTests.setUpClass() - if cls.haz_network: - cls.NET_NAME = uuid.uuid4().hex - cls.PROJECT_NAME = uuid.uuid4().hex - - opts = cls.get_opts(cls.FIELDS) - raw_output = cls.openstack( - 'network create ' + cls.NET_NAME + opts - ) - cls.OBJECT_ID = raw_output.strip('\n') - opts = cls.get_opts(['id', 'object_id']) - raw_output = cls.openstack( - 'network rbac create ' + - cls.OBJECT_ID + - ' --action access_as_shared' + - ' --target-project admin' + - ' --type network' + opts - ) - cls.ID, object_id, rol = tuple(raw_output.split('\n')) - cls.assertOutput(cls.OBJECT_ID, object_id) - - @classmethod - def tearDownClass(cls): - try: - if cls.haz_network: - raw_output_rbac = cls.openstack( - 'network rbac delete ' + cls.ID - ) - raw_output_network = cls.openstack( - 'network delete ' + cls.OBJECT_ID - ) - cls.assertOutput('', raw_output_rbac) - cls.assertOutput('', raw_output_network) - finally: - super(NetworkRBACTests, cls).tearDownClass() - def setUp(self): super(NetworkRBACTests, self).setUp() # Nothing in this class works with Nova Network if not self.haz_network: self.skipTest("No Network service present") + self.NET_NAME = uuid.uuid4().hex + self.PROJECT_NAME = uuid.uuid4().hex + + cmd_output = json.loads(self.openstack( + 'network create -f json ' + self.NET_NAME + )) + self.addCleanup(self.openstack, + 'network delete ' + cmd_output['id']) + self.OBJECT_ID = cmd_output['id'] + + cmd_output = json.loads(self.openstack( + 'network rbac create -f json ' + + self.OBJECT_ID + + ' --action access_as_shared' + + ' --target-project admin' + + ' --type network' + )) + self.addCleanup(self.openstack, + 'network rbac delete ' + cmd_output['id']) + self.ID = cmd_output['id'] + self.assertEqual(self.OBJECT_ID, cmd_output['object_id']) + def test_network_rbac_list(self): - opts = self.get_opts(self.HEADERS) - raw_output = self.openstack('network rbac list' + opts) - self.assertIn(self.ID, raw_output) + cmd_output = json.loads(self.openstack('network rbac list -f json')) + self.assertIn(self.ID, [rbac['ID'] for rbac in cmd_output]) def test_network_rbac_show(self): - opts = self.get_opts(self.FIELDS) - raw_output = self.openstack('network rbac show ' + self.ID + opts) - self.assertEqual(self.ID + "\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network rbac show -f json ' + self.ID)) + self.assertEqual(self.ID, cmd_output['id']) def test_network_rbac_set(self): - opts = self.get_opts(self.FIELDS) - project_id = self.openstack( - 'project create ' + self.PROJECT_NAME + opts) + project_id = json.loads(self.openstack( + 'project create -f json ' + self.PROJECT_NAME))['id'] self.openstack('network rbac set ' + self.ID + ' --target-project ' + self.PROJECT_NAME) - opts = self.get_opts(['target_project_id']) - raw_output_rbac = self.openstack('network rbac show ' + self.ID + opts) + cmd_output_rbac = json.loads(self.openstack( + 'network rbac show -f json ' + self.ID)) + self.assertEqual(project_id, cmd_output_rbac['target_project_id']) raw_output_project = self.openstack( 'project delete ' + self.PROJECT_NAME) - self.assertEqual(project_id, raw_output_rbac) - self.assertOutput('', raw_output_project) + self.assertEqual('', raw_output_project) diff --git a/openstackclient/tests/functional/network/v2/test_network_segment.py b/openstackclient/tests/functional/network/v2/test_network_segment.py index e1dbc7a0..8940273f 100644 --- a/openstackclient/tests/functional/network/v2/test_network_segment.py +++ b/openstackclient/tests/functional/network/v2/test_network_segment.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +import json import uuid from openstackclient.tests.functional.network.v2 import common @@ -18,64 +19,117 @@ from openstackclient.tests.functional.network.v2 import common class NetworkSegmentTests(common.NetworkTests): """Functional tests for network segment""" + @classmethod + def setUpClass(cls): + common.NetworkTests.setUpClass() + if cls.haz_network: + cls.NETWORK_NAME = uuid.uuid4().hex + cls.PHYSICAL_NETWORK_NAME = uuid.uuid4().hex + + # Create a network for the all subnet tests + cmd_output = json.loads(cls.openstack( + 'network create -f json ' + + cls.NETWORK_NAME + )) + # Get network_id for assertEqual + cls.NETWORK_ID = cmd_output["id"] + + @classmethod + def tearDownClass(cls): + try: + if cls.haz_network: + raw_output = cls.openstack( + 'network delete ' + + cls.NETWORK_NAME + ) + cls.assertOutput('', raw_output) + finally: + super(NetworkSegmentTests, cls).tearDownClass() + def setUp(self): super(NetworkSegmentTests, self).setUp() # Nothing in this class works with Nova Network if not self.haz_network: self.skipTest("No Network service present") - self.NETWORK_NAME = uuid.uuid4().hex - self.PHYSICAL_NETWORK_NAME = uuid.uuid4().hex - - # Create a network for the segment - opts = self.get_opts(['id']) - raw_output = self.openstack( - 'network create ' + self.NETWORK_NAME + opts - ) - self.addCleanup(self.openstack, - 'network delete ' + self.NETWORK_NAME) - self.NETWORK_ID = raw_output.strip('\n') - - # Get the segment for the network. - opts = self.get_opts(['ID', 'Network']) - raw_output = self.openstack( - 'network segment list ' - '--network ' + self.NETWORK_NAME + ' ' + - opts + def test_network_segment_create_delete(self): + name = uuid.uuid4().hex + json_output = json.loads(self.openstack( + ' network segment create -f json ' + + '--network ' + self.NETWORK_ID + ' ' + + '--network-type geneve ' + + '--segment 2055 ' + + name + )) + self.assertEqual( + name, + json_output["name"], ) - raw_output_row = raw_output.split('\n')[0] - self.NETWORK_SEGMENT_ID = raw_output_row.split(' ')[0] - def test_network_segment_create_delete(self): - opts = self.get_opts(['id']) raw_output = self.openstack( - ' network segment create --network ' + self.NETWORK_ID + - ' --network-type geneve ' + - ' --segment 2055 test_segment ' + opts + 'network segment delete ' + name, ) - network_segment_id = raw_output.strip('\n') - raw_output = self.openstack('network segment delete ' + - network_segment_id) self.assertOutput('', raw_output) def test_network_segment_list(self): - opts = self.get_opts(['ID']) - raw_output = self.openstack('network segment list' + opts) - self.assertIn(self.NETWORK_SEGMENT_ID, raw_output) + name = uuid.uuid4().hex + json_output = json.loads(self.openstack( + ' network segment create -f json ' + + '--network ' + self.NETWORK_ID + ' ' + + '--network-type geneve ' + + '--segment 2055 ' + + name + )) + network_segment_id = json_output.get('id') + network_segment_name = json_output.get('name') + self.addCleanup( + self.openstack, + 'network segment delete ' + network_segment_id + ) + self.assertEqual( + name, + json_output["name"], + ) + + json_output = json.loads(self.openstack( + 'network segment list -f json' + )) + item_map = { + item.get('ID'): item.get('Name') for item in json_output + } + self.assertIn(network_segment_id, item_map.keys()) + self.assertIn(network_segment_name, item_map.values()) + + def test_network_segment_set_show(self): + name = uuid.uuid4().hex + json_output = json.loads(self.openstack( + ' network segment create -f json ' + + '--network ' + self.NETWORK_ID + ' ' + + '--network-type geneve ' + + '--segment 2055 ' + + name + )) + self.addCleanup( + self.openstack, + 'network segment delete ' + name + ) + self.assertIsNone( + json_output["description"], + ) - def test_network_segment_set(self): new_description = 'new_description' - raw_output = self.openstack('network segment set ' + - '--description ' + new_description + - ' ' + self.NETWORK_SEGMENT_ID) - self.assertOutput('', raw_output) - opts = self.get_opts(['description']) - raw_output = self.openstack('network segment show ' + - self.NETWORK_SEGMENT_ID + opts) - self.assertEqual(new_description + "\n", raw_output) + cmd_output = self.openstack( + 'network segment set ' + + '--description ' + new_description + ' ' + + name + ) + self.assertOutput('', cmd_output) - def test_network_segment_show(self): - opts = self.get_opts(['network_id']) - raw_output = self.openstack('network segment show ' + - self.NETWORK_SEGMENT_ID + opts) - self.assertEqual(self.NETWORK_ID + "\n", raw_output) + json_output = json.loads(self.openstack( + 'network segment show -f json ' + + name + )) + self.assertEqual( + new_description, + json_output["description"], + ) diff --git a/openstackclient/tests/functional/network/v2/test_network_service_provider.py b/openstackclient/tests/functional/network/v2/test_network_service_provider.py index 8ed44dd9..999b7eb7 100644 --- a/openstackclient/tests/functional/network/v2/test_network_service_provider.py +++ b/openstackclient/tests/functional/network/v2/test_network_service_provider.py @@ -13,14 +13,14 @@ # License for the specific language governing permissions and limitations # under the License. +import json + from openstackclient.tests.functional.network.v2 import common class TestNetworkServiceProvider(common.NetworkTests): """Functional tests for network service provider""" - SERVICE_TYPE = 'L3_ROUTER_NAT' - def setUp(self): super(TestNetworkServiceProvider, self).setUp() # Nothing in this class works with Nova Network @@ -28,5 +28,6 @@ class TestNetworkServiceProvider(common.NetworkTests): self.skipTest("No Network service present") def test_network_service_provider_list(self): - raw_output = self.openstack('network service provider list') - self.assertIn(self.SERVICE_TYPE, raw_output) + cmd_output = json.loads(self.openstack( + 'network service provider list -f json')) + self.assertIn('L3_ROUTER_NAT', [x['Service Type'] for x in cmd_output]) diff --git a/openstackclient/tests/functional/network/v2/test_security_group.py b/openstackclient/tests/functional/network/v2/test_security_group.py index b601c913..8ae24b72 100644 --- a/openstackclient/tests/functional/network/v2/test_security_group.py +++ b/openstackclient/tests/functional/network/v2/test_security_group.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +import json import uuid from openstackclient.tests.functional.network.v2 import common @@ -17,44 +18,6 @@ from openstackclient.tests.functional.network.v2 import common class SecurityGroupTests(common.NetworkTests): """Functional tests for security group""" - HEADERS = ['Name'] - FIELDS = ['name'] - - @classmethod - def setUpClass(cls): - common.NetworkTests.setUpClass() - if cls.haz_network: - cls.NAME = uuid.uuid4().hex - cls.OTHER_NAME = uuid.uuid4().hex - - opts = cls.get_opts(cls.FIELDS) - raw_output = cls.openstack( - 'security group create ' + - cls.NAME + - opts - ) - expected = cls.NAME + '\n' - cls.assertOutput(expected, raw_output) - - @classmethod - def tearDownClass(cls): - try: - if cls.haz_network: - # Rename test - raw_output = cls.openstack( - 'security group set --name ' + - cls.OTHER_NAME + ' ' + - cls.NAME - ) - cls.assertOutput('', raw_output) - # Delete test - raw_output = cls.openstack( - 'security group delete ' + - cls.OTHER_NAME - ) - cls.assertOutput('', raw_output) - finally: - super(SecurityGroupTests, cls).tearDownClass() def setUp(self): super(SecurityGroupTests, self).setUp() @@ -62,22 +25,33 @@ class SecurityGroupTests(common.NetworkTests): if not self.haz_network: self.skipTest("No Network service present") + self.NAME = uuid.uuid4().hex + self.OTHER_NAME = uuid.uuid4().hex + cmd_output = json.loads(self.openstack( + 'security group create -f json ' + + self.NAME + )) + self.addCleanup(self.openstack, + 'security group delete ' + cmd_output['id']) + self.assertEqual(self.NAME, cmd_output['name']) + def test_security_group_list(self): - opts = self.get_opts(self.HEADERS) - raw_output = self.openstack('security group list' + opts) - self.assertIn(self.NAME, raw_output) + cmd_output = json.loads(self.openstack('security group list -f json')) + self.assertIn(self.NAME, [sg['Name'] for sg in cmd_output]) def test_security_group_set(self): + other_name = uuid.uuid4().hex raw_output = self.openstack( - 'security group set --description NSA ' + self.NAME + 'security group set --description NSA --name ' + + other_name + ' ' + self.NAME ) self.assertEqual('', raw_output) - opts = self.get_opts(['description']) - raw_output = self.openstack('security group show ' + self.NAME + opts) - self.assertEqual("NSA\n", raw_output) + cmd_output = json.loads(self.openstack( + 'security group show -f json ' + other_name)) + self.assertEqual('NSA', cmd_output['description']) def test_security_group_show(self): - opts = self.get_opts(self.FIELDS) - raw_output = self.openstack('security group show ' + self.NAME + opts) - self.assertEqual(self.NAME + "\n", raw_output) + cmd_output = json.loads(self.openstack( + 'security group show -f json ' + self.NAME)) + self.assertEqual(self.NAME, cmd_output['name']) diff --git a/openstackclient/tests/functional/network/v2/test_security_group_rule.py b/openstackclient/tests/functional/network/v2/test_security_group_rule.py index 40951a01..fe78bf47 100644 --- a/openstackclient/tests/functional/network/v2/test_security_group_rule.py +++ b/openstackclient/tests/functional/network/v2/test_security_group_rule.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +import json import uuid from openstackclient.tests.functional.network.v2 import common @@ -17,54 +18,6 @@ from openstackclient.tests.functional.network.v2 import common class SecurityGroupRuleTests(common.NetworkTests): """Functional tests for security group rule""" - SECURITY_GROUP_RULE_ID = None - NAME_FIELD = ['name'] - ID_FIELD = ['id'] - ID_HEADER = ['ID'] - - @classmethod - def setUpClass(cls): - common.NetworkTests.setUpClass() - if cls.haz_network: - cls.SECURITY_GROUP_NAME = uuid.uuid4().hex - - # Create the security group to hold the rule - opts = cls.get_opts(cls.NAME_FIELD) - raw_output = cls.openstack( - 'security group create ' + - cls.SECURITY_GROUP_NAME + - opts - ) - expected = cls.SECURITY_GROUP_NAME + '\n' - cls.assertOutput(expected, raw_output) - - # Create the security group rule. - opts = cls.get_opts(cls.ID_FIELD) - raw_output = cls.openstack( - 'security group rule create ' + - cls.SECURITY_GROUP_NAME + ' ' + - '--protocol tcp --dst-port 80:80 ' + - '--ingress --ethertype IPv4 ' + - opts - ) - cls.SECURITY_GROUP_RULE_ID = raw_output.strip('\n') - - @classmethod - def tearDownClass(cls): - try: - if cls.haz_network: - raw_output = cls.openstack( - 'security group rule delete ' + - cls.SECURITY_GROUP_RULE_ID - ) - cls.assertOutput('', raw_output) - raw_output = cls.openstack( - 'security group delete ' + - cls.SECURITY_GROUP_NAME - ) - cls.assertOutput('', raw_output) - finally: - super(SecurityGroupRuleTests, cls).tearDownClass() def setUp(self): super(SecurityGroupRuleTests, self).setUp() @@ -72,16 +25,35 @@ class SecurityGroupRuleTests(common.NetworkTests): if not self.haz_network: self.skipTest("No Network service present") + self.SECURITY_GROUP_NAME = uuid.uuid4().hex + + # Create the security group to hold the rule + cmd_output = json.loads(self.openstack( + 'security group create -f json ' + + self.SECURITY_GROUP_NAME + )) + self.addCleanup(self.openstack, + 'security group delete ' + self.SECURITY_GROUP_NAME) + self.assertEqual(self.SECURITY_GROUP_NAME, cmd_output['name']) + + # Create the security group rule. + cmd_output = json.loads(self.openstack( + 'security group rule create -f json ' + + self.SECURITY_GROUP_NAME + ' ' + + '--protocol tcp --dst-port 80:80 ' + + '--ingress --ethertype IPv4 ' + )) + self.addCleanup(self.openstack, + 'security group rule delete ' + cmd_output['id']) + self.SECURITY_GROUP_RULE_ID = cmd_output['id'] + def test_security_group_rule_list(self): - opts = self.get_opts(self.ID_HEADER) - raw_output = self.openstack('security group rule list ' + - self.SECURITY_GROUP_NAME + - opts) - self.assertIn(self.SECURITY_GROUP_RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'security group rule list -f json ' + self.SECURITY_GROUP_NAME)) + self.assertIn(self.SECURITY_GROUP_RULE_ID, + [rule['ID'] for rule in cmd_output]) def test_security_group_rule_show(self): - opts = self.get_opts(self.ID_FIELD) - raw_output = self.openstack('security group rule show ' + - self.SECURITY_GROUP_RULE_ID + - opts) - self.assertEqual(self.SECURITY_GROUP_RULE_ID + "\n", raw_output) + cmd_output = json.loads(self.openstack( + 'security group rule show -f json ' + self.SECURITY_GROUP_RULE_ID)) + self.assertEqual(self.SECURITY_GROUP_RULE_ID, cmd_output['id']) diff --git a/openstackclient/tests/functional/volume/v1/common.py b/openstackclient/tests/functional/volume/v1/common.py index 4978cea3..bb3c674e 100644 --- a/openstackclient/tests/functional/volume/v1/common.py +++ b/openstackclient/tests/functional/volume/v1/common.py @@ -12,14 +12,35 @@ import fixtures -from openstackclient.tests.functional.volume import base +from openstackclient.tests.functional import base +from openstackclient.tests.functional.volume import base as volume_base -class BaseVolumeTests(base.BaseVolumeTests): - """Base class for Volume functional tests. """ +class BaseVolumeTests(volume_base.BaseVolumeTests): + """Base class for Volume functional tests""" + + @classmethod + def setUpClass(cls): + super(BaseVolumeTests, cls).setUpClass() + # TODO(dtroyer): This needs to be updated to specifically check for + # Volume v1 rather than just 'volume', but for now + # that is enough until we get proper version negotiation + cls.haz_volume_v1 = base.is_service_enabled('volume') def setUp(self): super(BaseVolumeTests, self).setUp() + + # This class requires Volume v1 + # if not self.haz_volume_v1: + # self.skipTest("No Volume v1 service present") + + # TODO(dtroyer): We really want the above to work but right now + # (12Sep2017) DevStack still creates a 'volume' + # service type even though there is no service behind + # it. Until that is fixed we need to just skip the + # volume v1 functional tests in master. + self.skipTest("No Volume v1 service present") + ver_fixture = fixtures.EnvironmentVariable( 'OS_VOLUME_API_VERSION', '1' ) |
