summaryrefslogtreecommitdiff
path: root/openstackclient/tests/functional
diff options
context:
space:
mode:
authorDean Troyer <dtroyer@gmail.com>2017-09-14 11:06:51 -0500
committerDean Troyer <dtroyer@gmail.com>2017-09-14 11:16:11 -0500
commit5309bf5f873aba4e994c9011a893987c40d56eda (patch)
tree9e1bf90f6bb27cc6e0a5f9b4ef81ef8431d21599 /openstackclient/tests/functional
parent5e5239d5eb635470b0c3bc63c4a6263808c41c9b (diff)
parentf63bb180036c0f2d096eefaaaf2fbdd60a427343 (diff)
downloadpython-openstackclient-5309bf5f873aba4e994c9011a893987c40d56eda.tar.gz
Merge remote-tracking branch 'origin/master' into f4-merge-branch
Change-Id: Ie6c321e67aa9338334e4649879e60847a5d1eb56
Diffstat (limited to 'openstackclient/tests/functional')
-rw-r--r--openstackclient/tests/functional/base.py2
-rw-r--r--openstackclient/tests/functional/compute/v2/test_keypair.py4
-rw-r--r--openstackclient/tests/functional/identity/v3/test_service_provider.py3
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_qos_policy.py80
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_qos_rule.py339
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py7
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_rbac.py85
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_segment.py144
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_service_provider.py9
-rw-r--r--openstackclient/tests/functional/network/v2/test_security_group.py70
-rw-r--r--openstackclient/tests/functional/network/v2/test_security_group_rule.py88
-rw-r--r--openstackclient/tests/functional/volume/v1/common.py27
12 files changed, 412 insertions, 446 deletions
diff --git a/openstackclient/tests/functional/base.py b/openstackclient/tests/functional/base.py
index 4c88b13e..90bbc24d 100644
--- a/openstackclient/tests/functional/base.py
+++ b/openstackclient/tests/functional/base.py
@@ -14,10 +14,10 @@ import os
import re
import shlex
import subprocess
-import testtools
from tempest.lib.cli import output_parser
from tempest.lib import exceptions
+import testtools
COMMON_DIR = os.path.dirname(os.path.abspath(__file__))
diff --git a/openstackclient/tests/functional/compute/v2/test_keypair.py b/openstackclient/tests/functional/compute/v2/test_keypair.py
index 1e1a03d6..9a88e66f 100644
--- a/openstackclient/tests/functional/compute/v2/test_keypair.py
+++ b/openstackclient/tests/functional/compute/v2/test_keypair.py
@@ -13,11 +13,11 @@
import json
import tempfile
-from openstackclient.tests.functional import base
-
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions
+from openstackclient.tests.functional import base
+
class KeypairBase(base.TestCase):
"""Methods for functional tests."""
diff --git a/openstackclient/tests/functional/identity/v3/test_service_provider.py b/openstackclient/tests/functional/identity/v3/test_service_provider.py
index e072bc93..32b7a463 100644
--- a/openstackclient/tests/functional/identity/v3/test_service_provider.py
+++ b/openstackclient/tests/functional/identity/v3/test_service_provider.py
@@ -10,9 +10,10 @@
# License for the specific language governing permissions and limitations
# under the License.
-from openstackclient.tests.functional.identity.v3 import common
from tempest.lib.common.utils import data_utils
+from openstackclient.tests.functional.identity.v3 import common
+
class ServiceProviderTests(common.IdentityTests):
# Introduce functional test cases for command 'Service Provider'
diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_policy.py b/openstackclient/tests/functional/network/v2/test_network_qos_policy.py
index 53c15ecf..27c1b6aa 100644
--- a/openstackclient/tests/functional/network/v2/test_network_qos_policy.py
+++ b/openstackclient/tests/functional/network/v2/test_network_qos_policy.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
import uuid
from openstackclient.tests.functional.network.v2 import common
@@ -20,34 +21,6 @@ from openstackclient.tests.functional.network.v2 import common
class NetworkQosPolicyTests(common.NetworkTests):
"""Functional tests for QoS policy"""
- HEADERS = ['Name']
- FIELDS = ['name']
-
- @classmethod
- def setUpClass(cls):
- common.NetworkTests.setUpClass()
- if cls.haz_network:
- cls.NAME = uuid.uuid4().hex
-
- opts = cls.get_opts(cls.FIELDS)
- raw_output = cls.openstack(
- 'network qos policy create ' +
- cls.NAME +
- opts
- )
- cls.assertOutput(cls.NAME + "\n", raw_output)
-
- @classmethod
- def tearDownClass(cls):
- try:
- if cls.haz_network:
- raw_output = cls.openstack(
- 'network qos policy delete ' +
- cls.NAME
- )
- cls.assertOutput('', raw_output)
- finally:
- super(NetworkQosPolicyTests, cls).tearDownClass()
def setUp(self):
super(NetworkQosPolicyTests, self).setUp()
@@ -55,33 +28,46 @@ class NetworkQosPolicyTests(common.NetworkTests):
if not self.haz_network:
self.skipTest("No Network service present")
+ self.NAME = uuid.uuid4().hex
+ cmd_output = json.loads(self.openstack(
+ 'network qos policy create -f json ' +
+ self.NAME
+ ))
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + self.NAME)
+ self.assertEqual(self.NAME, cmd_output['name'])
+
+ def test_qos_rule_create_delete(self):
+ # This is to check the output of qos policy delete
+ policy_name = uuid.uuid4().hex
+ self.openstack('network qos policy create -f json ' + policy_name)
+ raw_output = self.openstack(
+ 'network qos policy delete ' + policy_name)
+ self.assertEqual('', raw_output)
+
def test_qos_policy_list(self):
- opts = self.get_opts(self.HEADERS)
- raw_output = self.openstack('network qos policy list' + opts)
- self.assertIn(self.NAME, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos policy list -f json'))
+ self.assertIn(self.NAME, [p['Name'] for p in cmd_output])
def test_qos_policy_show(self):
- opts = self.get_opts(self.FIELDS)
- raw_output = self.openstack('network qos policy show ' + self.NAME +
- opts)
- self.assertEqual(self.NAME + "\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos policy show -f json ' + self.NAME))
+ self.assertEqual(self.NAME, cmd_output['name'])
def test_qos_policy_set(self):
self.openstack('network qos policy set --share ' + self.NAME)
- opts = self.get_opts(['shared'])
- raw_output = self.openstack('network qos policy show ' + self.NAME +
- opts)
- self.assertEqual("True\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos policy show -f json ' + self.NAME))
+ self.assertTrue(cmd_output['shared'])
def test_qos_policy_default(self):
self.openstack('network qos policy set --default ' + self.NAME)
- opts = self.get_opts(['is_default'])
- raw_output = self.openstack('network qos policy show ' + self.NAME +
- opts)
- self.assertEqual("True\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos policy show -f json ' + self.NAME))
+ self.assertTrue(cmd_output['is_default'])
self.openstack('network qos policy set --no-default ' + self.NAME)
- opts = self.get_opts(['is_default'])
- raw_output = self.openstack('network qos policy show ' + self.NAME +
- opts)
- self.assertEqual("False\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos policy show -f json ' + self.NAME))
+ self.assertFalse(cmd_output['is_default'])
diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
index 8b34422f..770abe94 100644
--- a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
+++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
import uuid
from openstackclient.tests.functional.network.v2 import common
@@ -20,51 +21,6 @@ from openstackclient.tests.functional.network.v2 import common
class NetworkQosRuleTestsMinimumBandwidth(common.NetworkTests):
"""Functional tests for QoS minimum bandwidth rule"""
- RULE_ID = None
- MIN_KBPS = 2800
- MIN_KBPS_MODIFIED = 7500
- DIRECTION = '--egress'
- HEADERS = ['ID']
- FIELDS = ['id']
- TYPE = 'minimum-bandwidth'
-
- @classmethod
- def setUpClass(cls):
- common.NetworkTests.setUpClass()
- if cls.haz_network:
- cls.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
-
- opts = cls.get_opts(cls.FIELDS)
- cls.openstack(
- 'network qos policy create ' +
- cls.QOS_POLICY_NAME
- )
- cls.RULE_ID = cls.openstack(
- 'network qos rule create ' +
- '--type ' + cls.TYPE + ' ' +
- '--min-kbps ' + str(cls.MIN_KBPS) + ' ' +
- cls.DIRECTION + ' ' +
- cls.QOS_POLICY_NAME +
- opts
- )
- cls.assertsOutputNotNone(cls.RULE_ID)
-
- @classmethod
- def tearDownClass(cls):
- try:
- if cls.haz_network:
- raw_output = cls.openstack(
- 'network qos rule delete ' +
- cls.QOS_POLICY_NAME + ' ' +
- cls.RULE_ID
- )
- cls.openstack(
- 'network qos policy delete ' +
- cls.QOS_POLICY_NAME
- )
- cls.assertOutput('', raw_output)
- finally:
- super(NetworkQosRuleTestsMinimumBandwidth, cls).tearDownClass()
def setUp(self):
super(NetworkQosRuleTestsMinimumBandwidth, self).setUp()
@@ -72,72 +28,67 @@ class NetworkQosRuleTestsMinimumBandwidth(common.NetworkTests):
if not self.haz_network:
self.skipTest("No Network service present")
+ self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
+
+ self.openstack(
+ 'network qos policy create ' +
+ self.QOS_POLICY_NAME
+ )
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + self.QOS_POLICY_NAME)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule create -f json ' +
+ '--type minimum-bandwidth ' +
+ '--min-kbps 2800 ' +
+ '--egress ' +
+ self.QOS_POLICY_NAME
+ ))
+ self.RULE_ID = cmd_output['id']
+ self.addCleanup(self.openstack,
+ 'network qos rule delete ' +
+ self.QOS_POLICY_NAME + ' ' +
+ self.RULE_ID)
+ self.assertTrue(self.RULE_ID)
+
+ def test_qos_rule_create_delete(self):
+ # This is to check the output of qos rule delete
+ policy_name = uuid.uuid4().hex
+ self.openstack('network qos policy create -f json ' + policy_name)
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + policy_name)
+ rule = json.loads(self.openstack(
+ 'network qos rule create -f json ' +
+ '--type minimum-bandwidth ' +
+ '--min-kbps 2800 ' +
+ '--egress ' + policy_name
+ ))
+ raw_output = self.openstack(
+ 'network qos rule delete ' +
+ policy_name + ' ' + rule['id'])
+ self.assertEqual('', raw_output)
+
def test_qos_rule_list(self):
- opts = self.get_opts(self.HEADERS)
- raw_output = self.openstack('network qos rule list '
- + self.QOS_POLICY_NAME + opts)
- self.assertIn(self.RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule list -f json ' + self.QOS_POLICY_NAME))
+ self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output])
def test_qos_rule_show(self):
- opts = self.get_opts(self.FIELDS)
- raw_output = self.openstack('network qos rule show ' +
- self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
- opts)
- self.assertEqual(self.RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
+ self.assertEqual(self.RULE_ID, cmd_output['id'])
def test_qos_rule_set(self):
- self.openstack('network qos rule set --min-kbps ' +
- str(self.MIN_KBPS_MODIFIED) + ' ' +
+ self.openstack('network qos rule set --min-kbps 7500 ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
- opts = self.get_opts(['min_kbps'])
- raw_output = self.openstack('network qos rule show ' +
- self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
- opts)
- self.assertEqual(str(self.MIN_KBPS_MODIFIED) + "\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
+ self.assertEqual(7500, cmd_output['min_kbps'])
class NetworkQosRuleTestsDSCPMarking(common.NetworkTests):
"""Functional tests for QoS DSCP marking rule"""
- RULE_ID = None
- QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
- DSCP_MARK = 8
- DSCP_MARK_MODIFIED = 32
- HEADERS = ['ID']
- FIELDS = ['id']
- TYPE = 'dscp-marking'
-
- @classmethod
- def setUpClass(cls):
- common.NetworkTests.setUpClass()
- if cls.haz_network:
- opts = cls.get_opts(cls.FIELDS)
- cls.openstack(
- 'network qos policy create ' +
- cls.QOS_POLICY_NAME
- )
- cls.RULE_ID = cls.openstack(
- 'network qos rule create ' +
- '--type ' + cls.TYPE + ' ' +
- '--dscp-mark ' + str(cls.DSCP_MARK) + ' ' +
- cls.QOS_POLICY_NAME +
- opts
- )
- cls.assertsOutputNotNone(cls.RULE_ID)
-
- @classmethod
- def tearDownClass(cls):
- try:
- if cls.haz_network:
- raw_output = cls.openstack(
- 'network qos rule delete ' +
- cls.QOS_POLICY_NAME + ' ' +
- cls.RULE_ID
- )
- cls.openstack(
- 'network qos policy delete ' + cls.QOS_POLICY_NAME)
- cls.assertOutput('', raw_output)
- finally:
- super(NetworkQosRuleTestsDSCPMarking, cls).tearDownClass()
def setUp(self):
super(NetworkQosRuleTestsDSCPMarking, self).setUp()
@@ -145,78 +96,63 @@ class NetworkQosRuleTestsDSCPMarking(common.NetworkTests):
if not self.haz_network:
self.skipTest("No Network service present")
+ self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
+ self.openstack(
+ 'network qos policy create ' +
+ self.QOS_POLICY_NAME
+ )
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + self.QOS_POLICY_NAME)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule create -f json ' +
+ '--type dscp-marking ' +
+ '--dscp-mark 8 ' +
+ self.QOS_POLICY_NAME
+ ))
+ self.RULE_ID = cmd_output['id']
+ self.addCleanup(self.openstack,
+ 'network qos rule delete ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
+ self.assertTrue(self.RULE_ID)
+
+ def test_qos_rule_create_delete(self):
+ # This is to check the output of qos rule delete
+ policy_name = uuid.uuid4().hex
+ self.openstack('network qos policy create -f json ' + policy_name)
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + policy_name)
+ rule = json.loads(self.openstack(
+ 'network qos rule create -f json ' +
+ '--type dscp-marking ' +
+ '--dscp-mark 8 ' + policy_name
+ ))
+ raw_output = self.openstack(
+ 'network qos rule delete ' +
+ policy_name + ' ' + rule['id'])
+ self.assertEqual('', raw_output)
+
def test_qos_rule_list(self):
- opts = self.get_opts(self.HEADERS)
- raw_output = self.openstack('network qos rule list '
- + self.QOS_POLICY_NAME + opts)
- self.assertIn(self.RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule list -f json ' + self.QOS_POLICY_NAME))
+ self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output])
def test_qos_rule_show(self):
- opts = self.get_opts(self.FIELDS)
- raw_output = self.openstack('network qos rule show ' +
- self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
- opts)
- self.assertEqual(self.RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
+ self.assertEqual(self.RULE_ID, cmd_output['id'])
def test_qos_rule_set(self):
- self.openstack('network qos rule set --dscp-mark ' +
- str(self.DSCP_MARK_MODIFIED) + ' ' +
+ self.openstack('network qos rule set --dscp-mark 32 ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
- opts = self.get_opts(['dscp_mark'])
- raw_output = self.openstack('network qos rule show ' +
- self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
- opts)
- self.assertEqual(str(self.DSCP_MARK_MODIFIED) + "\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
+ self.assertEqual(32, cmd_output['dscp_mark'])
class NetworkQosRuleTestsBandwidthLimit(common.NetworkTests):
"""Functional tests for QoS bandwidth limit rule"""
- RULE_ID = None
- QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
- MAX_KBPS = 10000
- MAX_KBPS_MODIFIED = 15000
- MAX_BURST_KBITS = 1400
- MAX_BURST_KBITS_MODIFIED = 1800
- RULE_DIRECTION = 'egress'
- RULE_DIRECTION_MODIFIED = 'ingress'
- HEADERS = ['ID']
- FIELDS = ['id']
- TYPE = 'bandwidth-limit'
-
- @classmethod
- def setUpClass(cls):
- common.NetworkTests.setUpClass()
- if cls.haz_network:
- opts = cls.get_opts(cls.FIELDS)
- cls.openstack(
- 'network qos policy create ' +
- cls.QOS_POLICY_NAME
- )
- cls.RULE_ID = cls.openstack(
- 'network qos rule create ' +
- '--type ' + cls.TYPE + ' ' +
- '--max-kbps ' + str(cls.MAX_KBPS) + ' ' +
- '--max-burst-kbits ' + str(cls.MAX_BURST_KBITS) + ' ' +
- '--' + cls.RULE_DIRECTION + ' ' +
- cls.QOS_POLICY_NAME +
- opts
- )
- cls.assertsOutputNotNone(cls.RULE_ID)
-
- @classmethod
- def tearDownClass(cls):
- try:
- if cls.haz_network:
- raw_output = cls.openstack(
- 'network qos rule delete ' +
- cls.QOS_POLICY_NAME + ' ' +
- cls.RULE_ID
- )
- cls.openstack(
- 'network qos policy delete ' + cls.QOS_POLICY_NAME)
- cls.assertOutput('', raw_output)
- finally:
- super(NetworkQosRuleTestsBandwidthLimit, cls).tearDownClass()
def setUp(self):
super(NetworkQosRuleTestsBandwidthLimit, self).setUp()
@@ -224,30 +160,65 @@ class NetworkQosRuleTestsBandwidthLimit(common.NetworkTests):
if not self.haz_network:
self.skipTest("No Network service present")
+ self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
+ self.openstack(
+ 'network qos policy create ' +
+ self.QOS_POLICY_NAME
+ )
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + self.QOS_POLICY_NAME)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule create -f json ' +
+ '--type bandwidth-limit ' +
+ '--max-kbps 10000 ' +
+ '--max-burst-kbits 1400 ' +
+ '--egress ' +
+ self.QOS_POLICY_NAME
+ ))
+ self.RULE_ID = cmd_output['id']
+ self.addCleanup(self.openstack,
+ 'network qos rule delete ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
+ self.assertTrue(self.RULE_ID)
+
+ def test_qos_rule_create_delete(self):
+ # This is to check the output of qos rule delete
+ policy_name = uuid.uuid4().hex
+ self.openstack('network qos policy create -f json ' + policy_name)
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + policy_name)
+ rule = json.loads(self.openstack(
+ 'network qos rule create -f json ' +
+ '--type bandwidth-limit ' +
+ '--max-kbps 10000 ' +
+ '--max-burst-kbits 1400 ' +
+ '--egress ' + policy_name
+ ))
+ raw_output = self.openstack(
+ 'network qos rule delete ' +
+ policy_name + ' ' + rule['id'])
+ self.assertEqual('', raw_output)
+
def test_qos_rule_list(self):
- opts = self.get_opts(self.HEADERS)
- raw_output = self.openstack('network qos rule list '
- + self.QOS_POLICY_NAME + opts)
- self.assertIn(self.RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule list -f json '
+ + self.QOS_POLICY_NAME))
+ self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output])
def test_qos_rule_show(self):
- opts = self.get_opts(self.FIELDS)
- raw_output = self.openstack('network qos rule show ' +
- self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
- opts)
- self.assertEqual(self.RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
+ self.assertEqual(self.RULE_ID, cmd_output['id'])
def test_qos_rule_set(self):
- self.openstack('network qos rule set --max-kbps ' +
- str(self.MAX_KBPS_MODIFIED) + ' --max-burst-kbits ' +
- str(self.MAX_BURST_KBITS_MODIFIED) + ' ' +
- '--' + self.RULE_DIRECTION_MODIFIED + ' ' +
+ self.openstack('network qos rule set --max-kbps 15000 ' +
+ '--max-burst-kbits 1800 ' +
+ '--ingress ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
- opts = self.get_opts(['direction', 'max_burst_kbps', 'max_kbps'])
- raw_output = self.openstack('network qos rule show ' +
- self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
- opts)
- expected = (str(self.RULE_DIRECTION_MODIFIED) + "\n" +
- str(self.MAX_BURST_KBITS_MODIFIED) + "\n" +
- str(self.MAX_KBPS_MODIFIED) + "\n")
- self.assertEqual(expected, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
+ self.assertEqual(15000, cmd_output['max_kbps'])
+ self.assertEqual(1800, cmd_output['max_burst_kbps'])
+ self.assertEqual('ingress', cmd_output['direction'])
diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py
index d7612936..a6ee3e10 100644
--- a/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py
+++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py
@@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
+
from openstackclient.tests.functional.network.v2 import common
@@ -29,6 +31,7 @@ class NetworkQosRuleTypeTests(common.NetworkTests):
self.skipTest("No Network service present")
def test_qos_rule_type_list(self):
- raw_output = self.openstack('network qos rule type list')
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule type list -f json'))
for rule_type in self.AVAILABLE_RULE_TYPES:
- self.assertIn(rule_type, raw_output)
+ self.assertIn(rule_type, [x['Type'] for x in cmd_output])
diff --git a/openstackclient/tests/functional/network/v2/test_network_rbac.py b/openstackclient/tests/functional/network/v2/test_network_rbac.py
index 2206761f..3bbe4f27 100644
--- a/openstackclient/tests/functional/network/v2/test_network_rbac.py
+++ b/openstackclient/tests/functional/network/v2/test_network_rbac.py
@@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
import uuid
from openstackclient.tests.functional.network.v2 import common
@@ -22,69 +23,51 @@ class NetworkRBACTests(common.NetworkTests):
HEADERS = ['ID']
FIELDS = ['id']
- @classmethod
- def setUpClass(cls):
- common.NetworkTests.setUpClass()
- if cls.haz_network:
- cls.NET_NAME = uuid.uuid4().hex
- cls.PROJECT_NAME = uuid.uuid4().hex
-
- opts = cls.get_opts(cls.FIELDS)
- raw_output = cls.openstack(
- 'network create ' + cls.NET_NAME + opts
- )
- cls.OBJECT_ID = raw_output.strip('\n')
- opts = cls.get_opts(['id', 'object_id'])
- raw_output = cls.openstack(
- 'network rbac create ' +
- cls.OBJECT_ID +
- ' --action access_as_shared' +
- ' --target-project admin' +
- ' --type network' + opts
- )
- cls.ID, object_id, rol = tuple(raw_output.split('\n'))
- cls.assertOutput(cls.OBJECT_ID, object_id)
-
- @classmethod
- def tearDownClass(cls):
- try:
- if cls.haz_network:
- raw_output_rbac = cls.openstack(
- 'network rbac delete ' + cls.ID
- )
- raw_output_network = cls.openstack(
- 'network delete ' + cls.OBJECT_ID
- )
- cls.assertOutput('', raw_output_rbac)
- cls.assertOutput('', raw_output_network)
- finally:
- super(NetworkRBACTests, cls).tearDownClass()
-
def setUp(self):
super(NetworkRBACTests, self).setUp()
# Nothing in this class works with Nova Network
if not self.haz_network:
self.skipTest("No Network service present")
+ self.NET_NAME = uuid.uuid4().hex
+ self.PROJECT_NAME = uuid.uuid4().hex
+
+ cmd_output = json.loads(self.openstack(
+ 'network create -f json ' + self.NET_NAME
+ ))
+ self.addCleanup(self.openstack,
+ 'network delete ' + cmd_output['id'])
+ self.OBJECT_ID = cmd_output['id']
+
+ cmd_output = json.loads(self.openstack(
+ 'network rbac create -f json ' +
+ self.OBJECT_ID +
+ ' --action access_as_shared' +
+ ' --target-project admin' +
+ ' --type network'
+ ))
+ self.addCleanup(self.openstack,
+ 'network rbac delete ' + cmd_output['id'])
+ self.ID = cmd_output['id']
+ self.assertEqual(self.OBJECT_ID, cmd_output['object_id'])
+
def test_network_rbac_list(self):
- opts = self.get_opts(self.HEADERS)
- raw_output = self.openstack('network rbac list' + opts)
- self.assertIn(self.ID, raw_output)
+ cmd_output = json.loads(self.openstack('network rbac list -f json'))
+ self.assertIn(self.ID, [rbac['ID'] for rbac in cmd_output])
def test_network_rbac_show(self):
- opts = self.get_opts(self.FIELDS)
- raw_output = self.openstack('network rbac show ' + self.ID + opts)
- self.assertEqual(self.ID + "\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network rbac show -f json ' + self.ID))
+ self.assertEqual(self.ID, cmd_output['id'])
def test_network_rbac_set(self):
- opts = self.get_opts(self.FIELDS)
- project_id = self.openstack(
- 'project create ' + self.PROJECT_NAME + opts)
+ project_id = json.loads(self.openstack(
+ 'project create -f json ' + self.PROJECT_NAME))['id']
self.openstack('network rbac set ' + self.ID +
' --target-project ' + self.PROJECT_NAME)
- opts = self.get_opts(['target_project_id'])
- raw_output_rbac = self.openstack('network rbac show ' + self.ID + opts)
+ cmd_output_rbac = json.loads(self.openstack(
+ 'network rbac show -f json ' + self.ID))
+ self.assertEqual(project_id, cmd_output_rbac['target_project_id'])
raw_output_project = self.openstack(
'project delete ' + self.PROJECT_NAME)
- self.assertEqual(project_id, raw_output_rbac)
- self.assertOutput('', raw_output_project)
+ self.assertEqual('', raw_output_project)
diff --git a/openstackclient/tests/functional/network/v2/test_network_segment.py b/openstackclient/tests/functional/network/v2/test_network_segment.py
index e1dbc7a0..8940273f 100644
--- a/openstackclient/tests/functional/network/v2/test_network_segment.py
+++ b/openstackclient/tests/functional/network/v2/test_network_segment.py
@@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
import uuid
from openstackclient.tests.functional.network.v2 import common
@@ -18,64 +19,117 @@ from openstackclient.tests.functional.network.v2 import common
class NetworkSegmentTests(common.NetworkTests):
"""Functional tests for network segment"""
+ @classmethod
+ def setUpClass(cls):
+ common.NetworkTests.setUpClass()
+ if cls.haz_network:
+ cls.NETWORK_NAME = uuid.uuid4().hex
+ cls.PHYSICAL_NETWORK_NAME = uuid.uuid4().hex
+
+ # Create a network for the all subnet tests
+ cmd_output = json.loads(cls.openstack(
+ 'network create -f json ' +
+ cls.NETWORK_NAME
+ ))
+ # Get network_id for assertEqual
+ cls.NETWORK_ID = cmd_output["id"]
+
+ @classmethod
+ def tearDownClass(cls):
+ try:
+ if cls.haz_network:
+ raw_output = cls.openstack(
+ 'network delete ' +
+ cls.NETWORK_NAME
+ )
+ cls.assertOutput('', raw_output)
+ finally:
+ super(NetworkSegmentTests, cls).tearDownClass()
+
def setUp(self):
super(NetworkSegmentTests, self).setUp()
# Nothing in this class works with Nova Network
if not self.haz_network:
self.skipTest("No Network service present")
- self.NETWORK_NAME = uuid.uuid4().hex
- self.PHYSICAL_NETWORK_NAME = uuid.uuid4().hex
-
- # Create a network for the segment
- opts = self.get_opts(['id'])
- raw_output = self.openstack(
- 'network create ' + self.NETWORK_NAME + opts
- )
- self.addCleanup(self.openstack,
- 'network delete ' + self.NETWORK_NAME)
- self.NETWORK_ID = raw_output.strip('\n')
-
- # Get the segment for the network.
- opts = self.get_opts(['ID', 'Network'])
- raw_output = self.openstack(
- 'network segment list '
- '--network ' + self.NETWORK_NAME + ' ' +
- opts
+ def test_network_segment_create_delete(self):
+ name = uuid.uuid4().hex
+ json_output = json.loads(self.openstack(
+ ' network segment create -f json ' +
+ '--network ' + self.NETWORK_ID + ' ' +
+ '--network-type geneve ' +
+ '--segment 2055 ' +
+ name
+ ))
+ self.assertEqual(
+ name,
+ json_output["name"],
)
- raw_output_row = raw_output.split('\n')[0]
- self.NETWORK_SEGMENT_ID = raw_output_row.split(' ')[0]
- def test_network_segment_create_delete(self):
- opts = self.get_opts(['id'])
raw_output = self.openstack(
- ' network segment create --network ' + self.NETWORK_ID +
- ' --network-type geneve ' +
- ' --segment 2055 test_segment ' + opts
+ 'network segment delete ' + name,
)
- network_segment_id = raw_output.strip('\n')
- raw_output = self.openstack('network segment delete ' +
- network_segment_id)
self.assertOutput('', raw_output)
def test_network_segment_list(self):
- opts = self.get_opts(['ID'])
- raw_output = self.openstack('network segment list' + opts)
- self.assertIn(self.NETWORK_SEGMENT_ID, raw_output)
+ name = uuid.uuid4().hex
+ json_output = json.loads(self.openstack(
+ ' network segment create -f json ' +
+ '--network ' + self.NETWORK_ID + ' ' +
+ '--network-type geneve ' +
+ '--segment 2055 ' +
+ name
+ ))
+ network_segment_id = json_output.get('id')
+ network_segment_name = json_output.get('name')
+ self.addCleanup(
+ self.openstack,
+ 'network segment delete ' + network_segment_id
+ )
+ self.assertEqual(
+ name,
+ json_output["name"],
+ )
+
+ json_output = json.loads(self.openstack(
+ 'network segment list -f json'
+ ))
+ item_map = {
+ item.get('ID'): item.get('Name') for item in json_output
+ }
+ self.assertIn(network_segment_id, item_map.keys())
+ self.assertIn(network_segment_name, item_map.values())
+
+ def test_network_segment_set_show(self):
+ name = uuid.uuid4().hex
+ json_output = json.loads(self.openstack(
+ ' network segment create -f json ' +
+ '--network ' + self.NETWORK_ID + ' ' +
+ '--network-type geneve ' +
+ '--segment 2055 ' +
+ name
+ ))
+ self.addCleanup(
+ self.openstack,
+ 'network segment delete ' + name
+ )
+ self.assertIsNone(
+ json_output["description"],
+ )
- def test_network_segment_set(self):
new_description = 'new_description'
- raw_output = self.openstack('network segment set ' +
- '--description ' + new_description +
- ' ' + self.NETWORK_SEGMENT_ID)
- self.assertOutput('', raw_output)
- opts = self.get_opts(['description'])
- raw_output = self.openstack('network segment show ' +
- self.NETWORK_SEGMENT_ID + opts)
- self.assertEqual(new_description + "\n", raw_output)
+ cmd_output = self.openstack(
+ 'network segment set ' +
+ '--description ' + new_description + ' ' +
+ name
+ )
+ self.assertOutput('', cmd_output)
- def test_network_segment_show(self):
- opts = self.get_opts(['network_id'])
- raw_output = self.openstack('network segment show ' +
- self.NETWORK_SEGMENT_ID + opts)
- self.assertEqual(self.NETWORK_ID + "\n", raw_output)
+ json_output = json.loads(self.openstack(
+ 'network segment show -f json ' +
+ name
+ ))
+ self.assertEqual(
+ new_description,
+ json_output["description"],
+ )
diff --git a/openstackclient/tests/functional/network/v2/test_network_service_provider.py b/openstackclient/tests/functional/network/v2/test_network_service_provider.py
index 8ed44dd9..999b7eb7 100644
--- a/openstackclient/tests/functional/network/v2/test_network_service_provider.py
+++ b/openstackclient/tests/functional/network/v2/test_network_service_provider.py
@@ -13,14 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
+
from openstackclient.tests.functional.network.v2 import common
class TestNetworkServiceProvider(common.NetworkTests):
"""Functional tests for network service provider"""
- SERVICE_TYPE = 'L3_ROUTER_NAT'
-
def setUp(self):
super(TestNetworkServiceProvider, self).setUp()
# Nothing in this class works with Nova Network
@@ -28,5 +28,6 @@ class TestNetworkServiceProvider(common.NetworkTests):
self.skipTest("No Network service present")
def test_network_service_provider_list(self):
- raw_output = self.openstack('network service provider list')
- self.assertIn(self.SERVICE_TYPE, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network service provider list -f json'))
+ self.assertIn('L3_ROUTER_NAT', [x['Service Type'] for x in cmd_output])
diff --git a/openstackclient/tests/functional/network/v2/test_security_group.py b/openstackclient/tests/functional/network/v2/test_security_group.py
index b601c913..8ae24b72 100644
--- a/openstackclient/tests/functional/network/v2/test_security_group.py
+++ b/openstackclient/tests/functional/network/v2/test_security_group.py
@@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
import uuid
from openstackclient.tests.functional.network.v2 import common
@@ -17,44 +18,6 @@ from openstackclient.tests.functional.network.v2 import common
class SecurityGroupTests(common.NetworkTests):
"""Functional tests for security group"""
- HEADERS = ['Name']
- FIELDS = ['name']
-
- @classmethod
- def setUpClass(cls):
- common.NetworkTests.setUpClass()
- if cls.haz_network:
- cls.NAME = uuid.uuid4().hex
- cls.OTHER_NAME = uuid.uuid4().hex
-
- opts = cls.get_opts(cls.FIELDS)
- raw_output = cls.openstack(
- 'security group create ' +
- cls.NAME +
- opts
- )
- expected = cls.NAME + '\n'
- cls.assertOutput(expected, raw_output)
-
- @classmethod
- def tearDownClass(cls):
- try:
- if cls.haz_network:
- # Rename test
- raw_output = cls.openstack(
- 'security group set --name ' +
- cls.OTHER_NAME + ' ' +
- cls.NAME
- )
- cls.assertOutput('', raw_output)
- # Delete test
- raw_output = cls.openstack(
- 'security group delete ' +
- cls.OTHER_NAME
- )
- cls.assertOutput('', raw_output)
- finally:
- super(SecurityGroupTests, cls).tearDownClass()
def setUp(self):
super(SecurityGroupTests, self).setUp()
@@ -62,22 +25,33 @@ class SecurityGroupTests(common.NetworkTests):
if not self.haz_network:
self.skipTest("No Network service present")
+ self.NAME = uuid.uuid4().hex
+ self.OTHER_NAME = uuid.uuid4().hex
+ cmd_output = json.loads(self.openstack(
+ 'security group create -f json ' +
+ self.NAME
+ ))
+ self.addCleanup(self.openstack,
+ 'security group delete ' + cmd_output['id'])
+ self.assertEqual(self.NAME, cmd_output['name'])
+
def test_security_group_list(self):
- opts = self.get_opts(self.HEADERS)
- raw_output = self.openstack('security group list' + opts)
- self.assertIn(self.NAME, raw_output)
+ cmd_output = json.loads(self.openstack('security group list -f json'))
+ self.assertIn(self.NAME, [sg['Name'] for sg in cmd_output])
def test_security_group_set(self):
+ other_name = uuid.uuid4().hex
raw_output = self.openstack(
- 'security group set --description NSA ' + self.NAME
+ 'security group set --description NSA --name ' +
+ other_name + ' ' + self.NAME
)
self.assertEqual('', raw_output)
- opts = self.get_opts(['description'])
- raw_output = self.openstack('security group show ' + self.NAME + opts)
- self.assertEqual("NSA\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'security group show -f json ' + other_name))
+ self.assertEqual('NSA', cmd_output['description'])
def test_security_group_show(self):
- opts = self.get_opts(self.FIELDS)
- raw_output = self.openstack('security group show ' + self.NAME + opts)
- self.assertEqual(self.NAME + "\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'security group show -f json ' + self.NAME))
+ self.assertEqual(self.NAME, cmd_output['name'])
diff --git a/openstackclient/tests/functional/network/v2/test_security_group_rule.py b/openstackclient/tests/functional/network/v2/test_security_group_rule.py
index 40951a01..fe78bf47 100644
--- a/openstackclient/tests/functional/network/v2/test_security_group_rule.py
+++ b/openstackclient/tests/functional/network/v2/test_security_group_rule.py
@@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
import uuid
from openstackclient.tests.functional.network.v2 import common
@@ -17,54 +18,6 @@ from openstackclient.tests.functional.network.v2 import common
class SecurityGroupRuleTests(common.NetworkTests):
"""Functional tests for security group rule"""
- SECURITY_GROUP_RULE_ID = None
- NAME_FIELD = ['name']
- ID_FIELD = ['id']
- ID_HEADER = ['ID']
-
- @classmethod
- def setUpClass(cls):
- common.NetworkTests.setUpClass()
- if cls.haz_network:
- cls.SECURITY_GROUP_NAME = uuid.uuid4().hex
-
- # Create the security group to hold the rule
- opts = cls.get_opts(cls.NAME_FIELD)
- raw_output = cls.openstack(
- 'security group create ' +
- cls.SECURITY_GROUP_NAME +
- opts
- )
- expected = cls.SECURITY_GROUP_NAME + '\n'
- cls.assertOutput(expected, raw_output)
-
- # Create the security group rule.
- opts = cls.get_opts(cls.ID_FIELD)
- raw_output = cls.openstack(
- 'security group rule create ' +
- cls.SECURITY_GROUP_NAME + ' ' +
- '--protocol tcp --dst-port 80:80 ' +
- '--ingress --ethertype IPv4 ' +
- opts
- )
- cls.SECURITY_GROUP_RULE_ID = raw_output.strip('\n')
-
- @classmethod
- def tearDownClass(cls):
- try:
- if cls.haz_network:
- raw_output = cls.openstack(
- 'security group rule delete ' +
- cls.SECURITY_GROUP_RULE_ID
- )
- cls.assertOutput('', raw_output)
- raw_output = cls.openstack(
- 'security group delete ' +
- cls.SECURITY_GROUP_NAME
- )
- cls.assertOutput('', raw_output)
- finally:
- super(SecurityGroupRuleTests, cls).tearDownClass()
def setUp(self):
super(SecurityGroupRuleTests, self).setUp()
@@ -72,16 +25,35 @@ class SecurityGroupRuleTests(common.NetworkTests):
if not self.haz_network:
self.skipTest("No Network service present")
+ self.SECURITY_GROUP_NAME = uuid.uuid4().hex
+
+ # Create the security group to hold the rule
+ cmd_output = json.loads(self.openstack(
+ 'security group create -f json ' +
+ self.SECURITY_GROUP_NAME
+ ))
+ self.addCleanup(self.openstack,
+ 'security group delete ' + self.SECURITY_GROUP_NAME)
+ self.assertEqual(self.SECURITY_GROUP_NAME, cmd_output['name'])
+
+ # Create the security group rule.
+ cmd_output = json.loads(self.openstack(
+ 'security group rule create -f json ' +
+ self.SECURITY_GROUP_NAME + ' ' +
+ '--protocol tcp --dst-port 80:80 ' +
+ '--ingress --ethertype IPv4 '
+ ))
+ self.addCleanup(self.openstack,
+ 'security group rule delete ' + cmd_output['id'])
+ self.SECURITY_GROUP_RULE_ID = cmd_output['id']
+
def test_security_group_rule_list(self):
- opts = self.get_opts(self.ID_HEADER)
- raw_output = self.openstack('security group rule list ' +
- self.SECURITY_GROUP_NAME +
- opts)
- self.assertIn(self.SECURITY_GROUP_RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'security group rule list -f json ' + self.SECURITY_GROUP_NAME))
+ self.assertIn(self.SECURITY_GROUP_RULE_ID,
+ [rule['ID'] for rule in cmd_output])
def test_security_group_rule_show(self):
- opts = self.get_opts(self.ID_FIELD)
- raw_output = self.openstack('security group rule show ' +
- self.SECURITY_GROUP_RULE_ID +
- opts)
- self.assertEqual(self.SECURITY_GROUP_RULE_ID + "\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'security group rule show -f json ' + self.SECURITY_GROUP_RULE_ID))
+ self.assertEqual(self.SECURITY_GROUP_RULE_ID, cmd_output['id'])
diff --git a/openstackclient/tests/functional/volume/v1/common.py b/openstackclient/tests/functional/volume/v1/common.py
index 4978cea3..bb3c674e 100644
--- a/openstackclient/tests/functional/volume/v1/common.py
+++ b/openstackclient/tests/functional/volume/v1/common.py
@@ -12,14 +12,35 @@
import fixtures
-from openstackclient.tests.functional.volume import base
+from openstackclient.tests.functional import base
+from openstackclient.tests.functional.volume import base as volume_base
-class BaseVolumeTests(base.BaseVolumeTests):
- """Base class for Volume functional tests. """
+class BaseVolumeTests(volume_base.BaseVolumeTests):
+ """Base class for Volume functional tests"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(BaseVolumeTests, cls).setUpClass()
+ # TODO(dtroyer): This needs to be updated to specifically check for
+ # Volume v1 rather than just 'volume', but for now
+ # that is enough until we get proper version negotiation
+ cls.haz_volume_v1 = base.is_service_enabled('volume')
def setUp(self):
super(BaseVolumeTests, self).setUp()
+
+ # This class requires Volume v1
+ # if not self.haz_volume_v1:
+ # self.skipTest("No Volume v1 service present")
+
+ # TODO(dtroyer): We really want the above to work but right now
+ # (12Sep2017) DevStack still creates a 'volume'
+ # service type even though there is no service behind
+ # it. Until that is fixed we need to just skip the
+ # volume v1 functional tests in master.
+ self.skipTest("No Volume v1 service present")
+
ver_fixture = fixtures.EnvironmentVariable(
'OS_VOLUME_API_VERSION', '1'
)