summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
authorAkihiro Motoki <amotoki@gmail.com>2017-08-23 21:15:12 +0000
committerAkihiro Motoki <amotoki@gmail.com>2017-08-23 21:15:12 +0000
commitfe8a50b6b1448038073c47c55b46fc4ebf4a6080 (patch)
tree3b6681ad351932c7feecb336e17a0048db389b51 /openstackclient
parent95e279176b05e0096988ae596f1e02adda248935 (diff)
downloadpython-openstackclient-fe8a50b6b1448038073c47c55b46fc4ebf4a6080.tar.gz
Convert network qos functional tests to JSON
Change-Id: Ie5cde2f927ec6abb6334ea01adfb06749384ed01
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_qos_policy.py80
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_qos_rule.py339
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py7
3 files changed, 193 insertions, 233 deletions
diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_policy.py b/openstackclient/tests/functional/network/v2/test_network_qos_policy.py
index 53c15ecf..27c1b6aa 100644
--- a/openstackclient/tests/functional/network/v2/test_network_qos_policy.py
+++ b/openstackclient/tests/functional/network/v2/test_network_qos_policy.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
import uuid
from openstackclient.tests.functional.network.v2 import common
@@ -20,34 +21,6 @@ from openstackclient.tests.functional.network.v2 import common
class NetworkQosPolicyTests(common.NetworkTests):
"""Functional tests for QoS policy"""
- HEADERS = ['Name']
- FIELDS = ['name']
-
- @classmethod
- def setUpClass(cls):
- common.NetworkTests.setUpClass()
- if cls.haz_network:
- cls.NAME = uuid.uuid4().hex
-
- opts = cls.get_opts(cls.FIELDS)
- raw_output = cls.openstack(
- 'network qos policy create ' +
- cls.NAME +
- opts
- )
- cls.assertOutput(cls.NAME + "\n", raw_output)
-
- @classmethod
- def tearDownClass(cls):
- try:
- if cls.haz_network:
- raw_output = cls.openstack(
- 'network qos policy delete ' +
- cls.NAME
- )
- cls.assertOutput('', raw_output)
- finally:
- super(NetworkQosPolicyTests, cls).tearDownClass()
def setUp(self):
super(NetworkQosPolicyTests, self).setUp()
@@ -55,33 +28,46 @@ class NetworkQosPolicyTests(common.NetworkTests):
if not self.haz_network:
self.skipTest("No Network service present")
+ self.NAME = uuid.uuid4().hex
+ cmd_output = json.loads(self.openstack(
+ 'network qos policy create -f json ' +
+ self.NAME
+ ))
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + self.NAME)
+ self.assertEqual(self.NAME, cmd_output['name'])
+
+ def test_qos_rule_create_delete(self):
+ # This is to check the output of qos policy delete
+ policy_name = uuid.uuid4().hex
+ self.openstack('network qos policy create -f json ' + policy_name)
+ raw_output = self.openstack(
+ 'network qos policy delete ' + policy_name)
+ self.assertEqual('', raw_output)
+
def test_qos_policy_list(self):
- opts = self.get_opts(self.HEADERS)
- raw_output = self.openstack('network qos policy list' + opts)
- self.assertIn(self.NAME, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos policy list -f json'))
+ self.assertIn(self.NAME, [p['Name'] for p in cmd_output])
def test_qos_policy_show(self):
- opts = self.get_opts(self.FIELDS)
- raw_output = self.openstack('network qos policy show ' + self.NAME +
- opts)
- self.assertEqual(self.NAME + "\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos policy show -f json ' + self.NAME))
+ self.assertEqual(self.NAME, cmd_output['name'])
def test_qos_policy_set(self):
self.openstack('network qos policy set --share ' + self.NAME)
- opts = self.get_opts(['shared'])
- raw_output = self.openstack('network qos policy show ' + self.NAME +
- opts)
- self.assertEqual("True\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos policy show -f json ' + self.NAME))
+ self.assertTrue(cmd_output['shared'])
def test_qos_policy_default(self):
self.openstack('network qos policy set --default ' + self.NAME)
- opts = self.get_opts(['is_default'])
- raw_output = self.openstack('network qos policy show ' + self.NAME +
- opts)
- self.assertEqual("True\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos policy show -f json ' + self.NAME))
+ self.assertTrue(cmd_output['is_default'])
self.openstack('network qos policy set --no-default ' + self.NAME)
- opts = self.get_opts(['is_default'])
- raw_output = self.openstack('network qos policy show ' + self.NAME +
- opts)
- self.assertEqual("False\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos policy show -f json ' + self.NAME))
+ self.assertFalse(cmd_output['is_default'])
diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
index 8b34422f..770abe94 100644
--- a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
+++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
import uuid
from openstackclient.tests.functional.network.v2 import common
@@ -20,51 +21,6 @@ from openstackclient.tests.functional.network.v2 import common
class NetworkQosRuleTestsMinimumBandwidth(common.NetworkTests):
"""Functional tests for QoS minimum bandwidth rule"""
- RULE_ID = None
- MIN_KBPS = 2800
- MIN_KBPS_MODIFIED = 7500
- DIRECTION = '--egress'
- HEADERS = ['ID']
- FIELDS = ['id']
- TYPE = 'minimum-bandwidth'
-
- @classmethod
- def setUpClass(cls):
- common.NetworkTests.setUpClass()
- if cls.haz_network:
- cls.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
-
- opts = cls.get_opts(cls.FIELDS)
- cls.openstack(
- 'network qos policy create ' +
- cls.QOS_POLICY_NAME
- )
- cls.RULE_ID = cls.openstack(
- 'network qos rule create ' +
- '--type ' + cls.TYPE + ' ' +
- '--min-kbps ' + str(cls.MIN_KBPS) + ' ' +
- cls.DIRECTION + ' ' +
- cls.QOS_POLICY_NAME +
- opts
- )
- cls.assertsOutputNotNone(cls.RULE_ID)
-
- @classmethod
- def tearDownClass(cls):
- try:
- if cls.haz_network:
- raw_output = cls.openstack(
- 'network qos rule delete ' +
- cls.QOS_POLICY_NAME + ' ' +
- cls.RULE_ID
- )
- cls.openstack(
- 'network qos policy delete ' +
- cls.QOS_POLICY_NAME
- )
- cls.assertOutput('', raw_output)
- finally:
- super(NetworkQosRuleTestsMinimumBandwidth, cls).tearDownClass()
def setUp(self):
super(NetworkQosRuleTestsMinimumBandwidth, self).setUp()
@@ -72,72 +28,67 @@ class NetworkQosRuleTestsMinimumBandwidth(common.NetworkTests):
if not self.haz_network:
self.skipTest("No Network service present")
+ self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
+
+ self.openstack(
+ 'network qos policy create ' +
+ self.QOS_POLICY_NAME
+ )
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + self.QOS_POLICY_NAME)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule create -f json ' +
+ '--type minimum-bandwidth ' +
+ '--min-kbps 2800 ' +
+ '--egress ' +
+ self.QOS_POLICY_NAME
+ ))
+ self.RULE_ID = cmd_output['id']
+ self.addCleanup(self.openstack,
+ 'network qos rule delete ' +
+ self.QOS_POLICY_NAME + ' ' +
+ self.RULE_ID)
+ self.assertTrue(self.RULE_ID)
+
+ def test_qos_rule_create_delete(self):
+ # This is to check the output of qos rule delete
+ policy_name = uuid.uuid4().hex
+ self.openstack('network qos policy create -f json ' + policy_name)
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + policy_name)
+ rule = json.loads(self.openstack(
+ 'network qos rule create -f json ' +
+ '--type minimum-bandwidth ' +
+ '--min-kbps 2800 ' +
+ '--egress ' + policy_name
+ ))
+ raw_output = self.openstack(
+ 'network qos rule delete ' +
+ policy_name + ' ' + rule['id'])
+ self.assertEqual('', raw_output)
+
def test_qos_rule_list(self):
- opts = self.get_opts(self.HEADERS)
- raw_output = self.openstack('network qos rule list '
- + self.QOS_POLICY_NAME + opts)
- self.assertIn(self.RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule list -f json ' + self.QOS_POLICY_NAME))
+ self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output])
def test_qos_rule_show(self):
- opts = self.get_opts(self.FIELDS)
- raw_output = self.openstack('network qos rule show ' +
- self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
- opts)
- self.assertEqual(self.RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
+ self.assertEqual(self.RULE_ID, cmd_output['id'])
def test_qos_rule_set(self):
- self.openstack('network qos rule set --min-kbps ' +
- str(self.MIN_KBPS_MODIFIED) + ' ' +
+ self.openstack('network qos rule set --min-kbps 7500 ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
- opts = self.get_opts(['min_kbps'])
- raw_output = self.openstack('network qos rule show ' +
- self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
- opts)
- self.assertEqual(str(self.MIN_KBPS_MODIFIED) + "\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
+ self.assertEqual(7500, cmd_output['min_kbps'])
class NetworkQosRuleTestsDSCPMarking(common.NetworkTests):
"""Functional tests for QoS DSCP marking rule"""
- RULE_ID = None
- QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
- DSCP_MARK = 8
- DSCP_MARK_MODIFIED = 32
- HEADERS = ['ID']
- FIELDS = ['id']
- TYPE = 'dscp-marking'
-
- @classmethod
- def setUpClass(cls):
- common.NetworkTests.setUpClass()
- if cls.haz_network:
- opts = cls.get_opts(cls.FIELDS)
- cls.openstack(
- 'network qos policy create ' +
- cls.QOS_POLICY_NAME
- )
- cls.RULE_ID = cls.openstack(
- 'network qos rule create ' +
- '--type ' + cls.TYPE + ' ' +
- '--dscp-mark ' + str(cls.DSCP_MARK) + ' ' +
- cls.QOS_POLICY_NAME +
- opts
- )
- cls.assertsOutputNotNone(cls.RULE_ID)
-
- @classmethod
- def tearDownClass(cls):
- try:
- if cls.haz_network:
- raw_output = cls.openstack(
- 'network qos rule delete ' +
- cls.QOS_POLICY_NAME + ' ' +
- cls.RULE_ID
- )
- cls.openstack(
- 'network qos policy delete ' + cls.QOS_POLICY_NAME)
- cls.assertOutput('', raw_output)
- finally:
- super(NetworkQosRuleTestsDSCPMarking, cls).tearDownClass()
def setUp(self):
super(NetworkQosRuleTestsDSCPMarking, self).setUp()
@@ -145,78 +96,63 @@ class NetworkQosRuleTestsDSCPMarking(common.NetworkTests):
if not self.haz_network:
self.skipTest("No Network service present")
+ self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
+ self.openstack(
+ 'network qos policy create ' +
+ self.QOS_POLICY_NAME
+ )
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + self.QOS_POLICY_NAME)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule create -f json ' +
+ '--type dscp-marking ' +
+ '--dscp-mark 8 ' +
+ self.QOS_POLICY_NAME
+ ))
+ self.RULE_ID = cmd_output['id']
+ self.addCleanup(self.openstack,
+ 'network qos rule delete ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
+ self.assertTrue(self.RULE_ID)
+
+ def test_qos_rule_create_delete(self):
+ # This is to check the output of qos rule delete
+ policy_name = uuid.uuid4().hex
+ self.openstack('network qos policy create -f json ' + policy_name)
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + policy_name)
+ rule = json.loads(self.openstack(
+ 'network qos rule create -f json ' +
+ '--type dscp-marking ' +
+ '--dscp-mark 8 ' + policy_name
+ ))
+ raw_output = self.openstack(
+ 'network qos rule delete ' +
+ policy_name + ' ' + rule['id'])
+ self.assertEqual('', raw_output)
+
def test_qos_rule_list(self):
- opts = self.get_opts(self.HEADERS)
- raw_output = self.openstack('network qos rule list '
- + self.QOS_POLICY_NAME + opts)
- self.assertIn(self.RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule list -f json ' + self.QOS_POLICY_NAME))
+ self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output])
def test_qos_rule_show(self):
- opts = self.get_opts(self.FIELDS)
- raw_output = self.openstack('network qos rule show ' +
- self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
- opts)
- self.assertEqual(self.RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
+ self.assertEqual(self.RULE_ID, cmd_output['id'])
def test_qos_rule_set(self):
- self.openstack('network qos rule set --dscp-mark ' +
- str(self.DSCP_MARK_MODIFIED) + ' ' +
+ self.openstack('network qos rule set --dscp-mark 32 ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
- opts = self.get_opts(['dscp_mark'])
- raw_output = self.openstack('network qos rule show ' +
- self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
- opts)
- self.assertEqual(str(self.DSCP_MARK_MODIFIED) + "\n", raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
+ self.assertEqual(32, cmd_output['dscp_mark'])
class NetworkQosRuleTestsBandwidthLimit(common.NetworkTests):
"""Functional tests for QoS bandwidth limit rule"""
- RULE_ID = None
- QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
- MAX_KBPS = 10000
- MAX_KBPS_MODIFIED = 15000
- MAX_BURST_KBITS = 1400
- MAX_BURST_KBITS_MODIFIED = 1800
- RULE_DIRECTION = 'egress'
- RULE_DIRECTION_MODIFIED = 'ingress'
- HEADERS = ['ID']
- FIELDS = ['id']
- TYPE = 'bandwidth-limit'
-
- @classmethod
- def setUpClass(cls):
- common.NetworkTests.setUpClass()
- if cls.haz_network:
- opts = cls.get_opts(cls.FIELDS)
- cls.openstack(
- 'network qos policy create ' +
- cls.QOS_POLICY_NAME
- )
- cls.RULE_ID = cls.openstack(
- 'network qos rule create ' +
- '--type ' + cls.TYPE + ' ' +
- '--max-kbps ' + str(cls.MAX_KBPS) + ' ' +
- '--max-burst-kbits ' + str(cls.MAX_BURST_KBITS) + ' ' +
- '--' + cls.RULE_DIRECTION + ' ' +
- cls.QOS_POLICY_NAME +
- opts
- )
- cls.assertsOutputNotNone(cls.RULE_ID)
-
- @classmethod
- def tearDownClass(cls):
- try:
- if cls.haz_network:
- raw_output = cls.openstack(
- 'network qos rule delete ' +
- cls.QOS_POLICY_NAME + ' ' +
- cls.RULE_ID
- )
- cls.openstack(
- 'network qos policy delete ' + cls.QOS_POLICY_NAME)
- cls.assertOutput('', raw_output)
- finally:
- super(NetworkQosRuleTestsBandwidthLimit, cls).tearDownClass()
def setUp(self):
super(NetworkQosRuleTestsBandwidthLimit, self).setUp()
@@ -224,30 +160,65 @@ class NetworkQosRuleTestsBandwidthLimit(common.NetworkTests):
if not self.haz_network:
self.skipTest("No Network service present")
+ self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
+ self.openstack(
+ 'network qos policy create ' +
+ self.QOS_POLICY_NAME
+ )
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + self.QOS_POLICY_NAME)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule create -f json ' +
+ '--type bandwidth-limit ' +
+ '--max-kbps 10000 ' +
+ '--max-burst-kbits 1400 ' +
+ '--egress ' +
+ self.QOS_POLICY_NAME
+ ))
+ self.RULE_ID = cmd_output['id']
+ self.addCleanup(self.openstack,
+ 'network qos rule delete ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
+ self.assertTrue(self.RULE_ID)
+
+ def test_qos_rule_create_delete(self):
+ # This is to check the output of qos rule delete
+ policy_name = uuid.uuid4().hex
+ self.openstack('network qos policy create -f json ' + policy_name)
+ self.addCleanup(self.openstack,
+ 'network qos policy delete ' + policy_name)
+ rule = json.loads(self.openstack(
+ 'network qos rule create -f json ' +
+ '--type bandwidth-limit ' +
+ '--max-kbps 10000 ' +
+ '--max-burst-kbits 1400 ' +
+ '--egress ' + policy_name
+ ))
+ raw_output = self.openstack(
+ 'network qos rule delete ' +
+ policy_name + ' ' + rule['id'])
+ self.assertEqual('', raw_output)
+
def test_qos_rule_list(self):
- opts = self.get_opts(self.HEADERS)
- raw_output = self.openstack('network qos rule list '
- + self.QOS_POLICY_NAME + opts)
- self.assertIn(self.RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule list -f json '
+ + self.QOS_POLICY_NAME))
+ self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output])
def test_qos_rule_show(self):
- opts = self.get_opts(self.FIELDS)
- raw_output = self.openstack('network qos rule show ' +
- self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
- opts)
- self.assertEqual(self.RULE_ID, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
+ self.assertEqual(self.RULE_ID, cmd_output['id'])
def test_qos_rule_set(self):
- self.openstack('network qos rule set --max-kbps ' +
- str(self.MAX_KBPS_MODIFIED) + ' --max-burst-kbits ' +
- str(self.MAX_BURST_KBITS_MODIFIED) + ' ' +
- '--' + self.RULE_DIRECTION_MODIFIED + ' ' +
+ self.openstack('network qos rule set --max-kbps 15000 ' +
+ '--max-burst-kbits 1800 ' +
+ '--ingress ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
- opts = self.get_opts(['direction', 'max_burst_kbps', 'max_kbps'])
- raw_output = self.openstack('network qos rule show ' +
- self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
- opts)
- expected = (str(self.RULE_DIRECTION_MODIFIED) + "\n" +
- str(self.MAX_BURST_KBITS_MODIFIED) + "\n" +
- str(self.MAX_KBPS_MODIFIED) + "\n")
- self.assertEqual(expected, raw_output)
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule show -f json ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
+ self.assertEqual(15000, cmd_output['max_kbps'])
+ self.assertEqual(1800, cmd_output['max_burst_kbps'])
+ self.assertEqual('ingress', cmd_output['direction'])
diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py
index d7612936..a6ee3e10 100644
--- a/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py
+++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py
@@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
+
from openstackclient.tests.functional.network.v2 import common
@@ -29,6 +31,7 @@ class NetworkQosRuleTypeTests(common.NetworkTests):
self.skipTest("No Network service present")
def test_qos_rule_type_list(self):
- raw_output = self.openstack('network qos rule type list')
+ cmd_output = json.loads(self.openstack(
+ 'network qos rule type list -f json'))
for rule_type in self.AVAILABLE_RULE_TYPES:
- self.assertIn(rule_type, raw_output)
+ self.assertIn(rule_type, [x['Type'] for x in cmd_output])