summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/network/v2/network_qos_rule.py356
-rw-r--r--openstackclient/tests/functional/base.py5
-rw-r--r--openstackclient/tests/functional/network/v2/test_network_qos_rule.py181
-rw-r--r--openstackclient/tests/unit/fakes.py6
-rw-r--r--openstackclient/tests/unit/network/v2/fakes.py162
-rw-r--r--openstackclient/tests/unit/network/v2/test_network_qos_rule.py997
6 files changed, 1634 insertions, 73 deletions
diff --git a/openstackclient/network/v2/network_qos_rule.py b/openstackclient/network/v2/network_qos_rule.py
new file mode 100644
index 00000000..054d16b4
--- /dev/null
+++ b/openstackclient/network/v2/network_qos_rule.py
@@ -0,0 +1,356 @@
+# Copyright (c) 2016, Intel Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import itertools
+import logging
+import six
+
+from osc_lib.command import command
+from osc_lib import exceptions
+from osc_lib import utils
+
+from openstackclient.i18n import _
+from openstackclient.network import sdk_utils
+
+
+LOG = logging.getLogger(__name__)
+
+RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit'
+RULE_TYPE_DSCP_MARKING = 'dscp-marking'
+RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth'
+REQUIRED_PARAMETERS = {
+ RULE_TYPE_MINIMUM_BANDWIDTH: ['min_kbps', 'direction'],
+ RULE_TYPE_DSCP_MARKING: ['dscp_mark'],
+ RULE_TYPE_BANDWIDTH_LIMIT: ['max_kbps', 'max_burst_kbps']}
+DIRECTION_EGRESS = 'egress'
+DIRECTION_INGRESS = 'ingress'
+DSCP_VALID_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
+ 34, 36, 38, 40, 46, 48, 56]
+
+ACTION_CREATE = 'create'
+ACTION_DELETE = 'delete'
+ACTION_FIND = 'find'
+ACTION_SET = 'update'
+ACTION_SHOW = 'get'
+
+
+def _get_columns(item):
+ column_map = {
+ 'tenant_id': 'project_id',
+ }
+ return sdk_utils.get_osc_show_columns_for_sdk_resource(item, column_map)
+
+
+def _check_type_parameters(attrs, type, is_create):
+ req_params = REQUIRED_PARAMETERS[type]
+ notreq_params = list(itertools.chain(
+ *[v for k, v in six.iteritems(REQUIRED_PARAMETERS) if k != type]))
+ if is_create and None in map(attrs.get, req_params):
+ msg = (_('"Create" rule command for type "%(rule_type)s" requires '
+ 'arguments %(args)s') % {'rule_type': type,
+ 'args': ", ".join(req_params)})
+ raise exceptions.CommandError(msg)
+ if set(six.iterkeys(attrs)) & set(notreq_params):
+ msg = (_('Rule type "%(rule_type)s" only requires arguments %(args)s')
+ % {'rule_type': type, 'args': ", ".join(req_params)})
+ raise exceptions.CommandError(msg)
+
+
+def _get_attrs(network_client, parsed_args, is_create=False):
+ attrs = {}
+ qos = network_client.find_qos_policy(parsed_args.qos_policy,
+ ignore_missing=False)
+ attrs['qos_policy_id'] = qos.id
+ if not is_create:
+ attrs['id'] = parsed_args.id
+ rule_type = _find_rule_type(qos, parsed_args.id)
+ if not rule_type:
+ msg = (_('Rule ID %(rule_id)s not found') %
+ {'rule_id': parsed_args.id})
+ raise exceptions.CommandError(msg)
+ else:
+ if not parsed_args.type:
+ msg = _('"Create" rule command requires argument "type"')
+ raise exceptions.CommandError(msg)
+ rule_type = parsed_args.type
+ if parsed_args.max_kbps:
+ attrs['max_kbps'] = parsed_args.max_kbps
+ if parsed_args.max_burst_kbits:
+ # NOTE(ralonsoh): this parameter must be changed in SDK and then in
+ # Neutron API, from 'max_burst_kbps' to
+ # 'max_burst_kbits'
+ attrs['max_burst_kbps'] = parsed_args.max_burst_kbits
+ if parsed_args.dscp_mark:
+ attrs['dscp_mark'] = parsed_args.dscp_mark
+ if parsed_args.min_kbps:
+ attrs['min_kbps'] = parsed_args.min_kbps
+ if parsed_args.ingress:
+ attrs['direction'] = 'ingress'
+ if parsed_args.egress:
+ attrs['direction'] = 'egress'
+ _check_type_parameters(attrs, rule_type, is_create)
+ return attrs
+
+
+def _get_item_properties(item, fields):
+ """Return a tuple containing the item properties."""
+ row = []
+ for field in fields:
+ row.append(item.get(field, ''))
+ return tuple(row)
+
+
+def _rule_action_call(client, action, rule_type):
+ rule_type = rule_type.replace('-', '_')
+ func_name = '%(action)s_qos_%(rule_type)s_rule' % {'action': action,
+ 'rule_type': rule_type}
+ return getattr(client, func_name)
+
+
+def _find_rule_type(qos, rule_id):
+ for rule in (r for r in qos.rules if r['id'] == rule_id):
+ return rule['type'].replace('_', '-')
+ return None
+
+
+def _add_rule_arguments(parser):
+ parser.add_argument(
+ '--max-kbps',
+ dest='max_kbps',
+ metavar='<max-kbps>',
+ type=int,
+ help=_('Maximum bandwidth in kbps')
+ )
+ parser.add_argument(
+ '--max-burst-kbits',
+ dest='max_burst_kbits',
+ metavar='<max-burst-kbits>',
+ type=int,
+ help=_('Maximum burst in kilobits, 0 means automatic')
+ )
+ parser.add_argument(
+ '--dscp-mark',
+ dest='dscp_mark',
+ metavar='<dscp-mark>',
+ type=int,
+ help=_('DSCP mark: value can be 0, even numbers from 8-56, '
+ 'excluding 42, 44, 50, 52, and 54')
+ )
+ parser.add_argument(
+ '--min-kbps',
+ dest='min_kbps',
+ metavar='<min-kbps>',
+ type=int,
+ help=_('Minimum guaranteed bandwidth in kbps')
+ )
+ direction_group = parser.add_mutually_exclusive_group()
+ direction_group.add_argument(
+ '--ingress',
+ action='store_true',
+ help=_("Ingress traffic direction from the project point of view")
+ )
+ direction_group.add_argument(
+ '--egress',
+ action='store_true',
+ help=_("Egress traffic direction from the project point of view")
+ )
+
+
+class CreateNetworkQosRule(command.ShowOne):
+ _description = _("Create new Network QoS rule")
+
+ def get_parser(self, prog_name):
+ parser = super(CreateNetworkQosRule, self).get_parser(
+ prog_name)
+ parser.add_argument(
+ 'qos_policy',
+ metavar='<qos-policy>',
+ help=_('QoS policy that contains the rule (name or ID)')
+ )
+ parser.add_argument(
+ '--type',
+ metavar='<type>',
+ choices=[RULE_TYPE_MINIMUM_BANDWIDTH,
+ RULE_TYPE_DSCP_MARKING,
+ RULE_TYPE_BANDWIDTH_LIMIT],
+ help=(_('QoS rule type (%s)') %
+ ", ".join(six.iterkeys(REQUIRED_PARAMETERS)))
+ )
+ _add_rule_arguments(parser)
+ return parser
+
+ def take_action(self, parsed_args):
+ network_client = self.app.client_manager.network
+ attrs = _get_attrs(network_client, parsed_args, is_create=True)
+ try:
+ obj = _rule_action_call(
+ network_client, ACTION_CREATE, parsed_args.type)(
+ attrs.pop('qos_policy_id'), **attrs)
+ except Exception as e:
+ msg = (_('Failed to create Network QoS rule: %(e)s') % {'e': e})
+ raise exceptions.CommandError(msg)
+ display_columns, columns = _get_columns(obj)
+ data = utils.get_item_properties(obj, columns)
+ return display_columns, data
+
+
+class DeleteNetworkQosRule(command.Command):
+ _description = _("Delete Network QoS rule")
+
+ def get_parser(self, prog_name):
+ parser = super(DeleteNetworkQosRule, self).get_parser(prog_name)
+ parser.add_argument(
+ 'qos_policy',
+ metavar='<qos-policy>',
+ help=_('QoS policy that contains the rule (name or ID)')
+ )
+ parser.add_argument(
+ 'id',
+ metavar='<rule-id>',
+ help=_('Network QoS rule to delete (ID)')
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ network_client = self.app.client_manager.network
+ rule_id = parsed_args.id
+ try:
+ qos = network_client.find_qos_policy(parsed_args.qos_policy,
+ ignore_missing=False)
+ rule_type = _find_rule_type(qos, rule_id)
+ if not rule_type:
+ raise Exception('Rule %s not found' % rule_id)
+ _rule_action_call(network_client, ACTION_DELETE, rule_type)(
+ rule_id, qos.id)
+ except Exception as e:
+ msg = (_('Failed to delete Network QoS rule ID "%(rule)s": %(e)s')
+ % {'rule': rule_id, 'e': e})
+ raise exceptions.CommandError(msg)
+
+
+class ListNetworkQosRule(command.Lister):
+ _description = _("List Network QoS rules")
+
+ def get_parser(self, prog_name):
+ parser = super(ListNetworkQosRule, self).get_parser(prog_name)
+ parser.add_argument(
+ 'qos_policy',
+ metavar='<qos-policy>',
+ help=_('QoS policy that contains the rule (name or ID)')
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ client = self.app.client_manager.network
+ columns = (
+ 'id',
+ 'qos_policy_id',
+ 'type',
+ 'max_kbps',
+ 'max_burst_kbps',
+ 'min_kbps',
+ 'dscp_mark',
+ 'direction',
+ )
+ column_headers = (
+ 'ID',
+ 'QoS Policy ID',
+ 'Type',
+ 'Max Kbps',
+ 'Max Burst Kbits',
+ 'Min Kbps',
+ 'DSCP mark',
+ 'Direction',
+ )
+ qos = client.find_qos_policy(parsed_args.qos_policy,
+ ignore_missing=False)
+ data = qos.rules
+ return (column_headers,
+ (_get_item_properties(s, columns) for s in data))
+
+
+class SetNetworkQosRule(command.Command):
+ _description = _("Set Network QoS rule properties")
+
+ def get_parser(self, prog_name):
+ parser = super(SetNetworkQosRule, self).get_parser(prog_name)
+ parser.add_argument(
+ 'qos_policy',
+ metavar='<qos-policy>',
+ help=_('QoS policy that contains the rule (name or ID)')
+ )
+ parser.add_argument(
+ 'id',
+ metavar='<rule-id>',
+ help=_('Network QoS rule to delete (ID)')
+ )
+ _add_rule_arguments(parser)
+ return parser
+
+ def take_action(self, parsed_args):
+ network_client = self.app.client_manager.network
+ try:
+ qos = network_client.find_qos_policy(parsed_args.qos_policy,
+ ignore_missing=False)
+ rule_type = _find_rule_type(qos, parsed_args.id)
+ if not rule_type:
+ raise Exception('Rule not found')
+ attrs = _get_attrs(network_client, parsed_args)
+ qos_id = attrs.pop('qos_policy_id')
+ qos_rule = _rule_action_call(network_client, ACTION_FIND,
+ rule_type)(attrs.pop('id'), qos_id)
+ _rule_action_call(network_client, ACTION_SET, rule_type)(
+ qos_rule, qos_id, **attrs)
+ except Exception as e:
+ msg = (_('Failed to set Network QoS rule ID "%(rule)s": %(e)s') %
+ {'rule': parsed_args.id, 'e': e})
+ raise exceptions.CommandError(msg)
+
+
+class ShowNetworkQosRule(command.ShowOne):
+ _description = _("Display Network QoS rule details")
+
+ def get_parser(self, prog_name):
+ parser = super(ShowNetworkQosRule, self).get_parser(prog_name)
+ parser.add_argument(
+ 'qos_policy',
+ metavar='<qos-policy>',
+ help=_('QoS policy that contains the rule (name or ID)')
+ )
+ parser.add_argument(
+ 'id',
+ metavar='<rule-id>',
+ help=_('Network QoS rule to delete (ID)')
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ network_client = self.app.client_manager.network
+ rule_id = parsed_args.id
+ try:
+ qos = network_client.find_qos_policy(parsed_args.qos_policy,
+ ignore_missing=False)
+ rule_type = _find_rule_type(qos, rule_id)
+ if not rule_type:
+ raise Exception('Rule not found')
+ obj = _rule_action_call(network_client, ACTION_SHOW, rule_type)(
+ rule_id, qos.id)
+ except Exception as e:
+ msg = (_('Failed to set Network QoS rule ID "%(rule)s": %(e)s') %
+ {'rule': rule_id, 'e': e})
+ raise exceptions.CommandError(msg)
+ display_columns, columns = _get_columns(obj)
+ data = utils.get_item_properties(obj, columns)
+ return display_columns, data
diff --git a/openstackclient/tests/functional/base.py b/openstackclient/tests/functional/base.py
index 885abc02..fb78ea62 100644
--- a/openstackclient/tests/functional/base.py
+++ b/openstackclient/tests/functional/base.py
@@ -77,6 +77,11 @@ class TestCase(testtools.TestCase):
if expected not in actual:
raise Exception(expected + ' not in ' + actual)
+ @classmethod
+ def assertsOutputNotNone(cls, observed):
+ if observed is None:
+ raise Exception('No output observed')
+
def assert_table_structure(self, items, field_names):
"""Verify that all items have keys listed in field_names."""
for item in items:
diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
new file mode 100644
index 00000000..af0c9bac
--- /dev/null
+++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py
@@ -0,0 +1,181 @@
+# Copyright (c) 2016, Intel Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from openstackclient.tests.functional import base
+
+
+class NetworkQosRuleTestsMinimumBandwidth(base.TestCase):
+ """Functional tests for QoS minimum bandwidth rule."""
+ RULE_ID = None
+ QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
+ MIN_KBPS = 2800
+ MIN_KBPS_MODIFIED = 7500
+ DIRECTION = '--egress'
+ HEADERS = ['ID']
+ FIELDS = ['id']
+ TYPE = 'minimum-bandwidth'
+
+ @classmethod
+ def setUpClass(cls):
+ opts = cls.get_opts(cls.FIELDS)
+ cls.openstack('network qos policy create ' + cls.QOS_POLICY_NAME)
+ cls.RULE_ID = cls.openstack('network qos rule create --type ' +
+ cls.TYPE + ' --min-kbps ' +
+ str(cls.MIN_KBPS) + ' ' + cls.DIRECTION +
+ ' ' + cls.QOS_POLICY_NAME + opts)
+ cls.assertsOutputNotNone(cls.RULE_ID)
+
+ @classmethod
+ def tearDownClass(cls):
+ raw_output = cls.openstack('network qos rule delete ' +
+ cls.QOS_POLICY_NAME + ' ' + cls.RULE_ID)
+ cls.openstack('network qos policy delete ' + cls.QOS_POLICY_NAME)
+ cls.assertOutput('', raw_output)
+
+ def test_qos_policy_list(self):
+ opts = self.get_opts(self.HEADERS)
+ raw_output = self.openstack('network qos rule list '
+ + self.QOS_POLICY_NAME + opts)
+ self.assertIn(self.RULE_ID, raw_output)
+
+ def test_qos_policy_show(self):
+ opts = self.get_opts(self.FIELDS)
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(self.RULE_ID, raw_output)
+
+ def test_qos_policy_set(self):
+ self.openstack('network qos rule set --min-kbps ' +
+ str(self.MIN_KBPS_MODIFIED) + ' ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
+ opts = self.get_opts(['min_kbps'])
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(str(self.MIN_KBPS_MODIFIED) + "\n", raw_output)
+
+
+class NetworkQosRuleTestsDSCPMarking(base.TestCase):
+ """Functional tests for QoS DSCP marking rule."""
+ RULE_ID = None
+ QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
+ DSCP_MARK = 8
+ DSCP_MARK_MODIFIED = 32
+ HEADERS = ['ID']
+ FIELDS = ['id']
+ TYPE = 'dscp-marking'
+
+ @classmethod
+ def setUpClass(cls):
+ opts = cls.get_opts(cls.FIELDS)
+ cls.openstack('network qos policy create ' + cls.QOS_POLICY_NAME)
+ cls.RULE_ID = cls.openstack('network qos rule create --type ' +
+ cls.TYPE + ' --dscp-mark ' +
+ str(cls.DSCP_MARK) + ' ' +
+ cls.QOS_POLICY_NAME + opts)
+ cls.assertsOutputNotNone(cls.RULE_ID)
+
+ @classmethod
+ def tearDownClass(cls):
+ raw_output = cls.openstack('network qos rule delete ' +
+ cls.QOS_POLICY_NAME + ' ' + cls.RULE_ID)
+ cls.openstack('network qos policy delete ' + cls.QOS_POLICY_NAME)
+ cls.assertOutput('', raw_output)
+
+ def test_qos_policy_list(self):
+ opts = self.get_opts(self.HEADERS)
+ raw_output = self.openstack('network qos rule list '
+ + self.QOS_POLICY_NAME + opts)
+ self.assertIn(self.RULE_ID, raw_output)
+
+ def test_qos_policy_show(self):
+ opts = self.get_opts(self.FIELDS)
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(self.RULE_ID, raw_output)
+
+ def test_qos_policy_set(self):
+ self.openstack('network qos rule set --dscp-mark ' +
+ str(self.DSCP_MARK_MODIFIED) + ' ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
+ opts = self.get_opts(['dscp_mark'])
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(str(self.DSCP_MARK_MODIFIED) + "\n", raw_output)
+
+
+class NetworkQosRuleTestsBandwidthLimit(base.TestCase):
+ """Functional tests for QoS bandwidth limit rule."""
+ RULE_ID = None
+ QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
+ MAX_KBPS = 10000
+ MAX_KBPS_MODIFIED = 15000
+ MAX_BURST_KBITS = 1400
+ MAX_BURST_KBITS_MODIFIED = 1800
+ HEADERS = ['ID']
+ FIELDS = ['id']
+ TYPE = 'bandwidth-limit'
+
+ @classmethod
+ def setUpClass(cls):
+ opts = cls.get_opts(cls.FIELDS)
+ cls.openstack('network qos policy create ' + cls.QOS_POLICY_NAME)
+ cls.RULE_ID = cls.openstack('network qos rule create --type ' +
+ cls.TYPE + ' --max-kbps ' +
+ str(cls.MAX_KBPS) + ' --max-burst-kbits ' +
+ str(cls.MAX_BURST_KBITS) + ' ' +
+ cls.QOS_POLICY_NAME + opts)
+ cls.assertsOutputNotNone(cls.RULE_ID)
+
+ @classmethod
+ def tearDownClass(cls):
+ raw_output = cls.openstack('network qos rule delete ' +
+ cls.QOS_POLICY_NAME + ' ' + cls.RULE_ID)
+ cls.openstack('network qos policy delete ' + cls.QOS_POLICY_NAME)
+ cls.assertOutput('', raw_output)
+
+ def test_qos_policy_list(self):
+ opts = self.get_opts(self.HEADERS)
+ raw_output = self.openstack('network qos rule list '
+ + self.QOS_POLICY_NAME + opts)
+ self.assertIn(self.RULE_ID, raw_output)
+
+ def test_qos_policy_show(self):
+ opts = self.get_opts(self.FIELDS)
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(self.RULE_ID, raw_output)
+
+ def test_qos_policy_set(self):
+ self.openstack('network qos rule set --max-kbps ' +
+ str(self.MAX_KBPS_MODIFIED) + ' --max-burst-kbits ' +
+ str(self.MAX_BURST_KBITS_MODIFIED) + ' ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
+ opts = self.get_opts(['max_kbps'])
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(str(self.MAX_KBPS_MODIFIED) + "\n", raw_output)
+ opts = self.get_opts(['max_burst_kbps'])
+ raw_output = self.openstack('network qos rule show ' +
+ self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
+ opts)
+ self.assertEqual(str(self.MAX_BURST_KBITS_MODIFIED) + "\n", raw_output)
diff --git a/openstackclient/tests/unit/fakes.py b/openstackclient/tests/unit/fakes.py
index ca6b1d31..626b466d 100644
--- a/openstackclient/tests/unit/fakes.py
+++ b/openstackclient/tests/unit/fakes.py
@@ -219,6 +219,12 @@ class FakeResource(object):
def info(self):
return self._info
+ def __getitem__(self, item):
+ return self._info.get(item)
+
+ def get(self, item, default=None):
+ return self._info.get(item, default)
+
class FakeResponse(requests.Response):
diff --git a/openstackclient/tests/unit/network/v2/fakes.py b/openstackclient/tests/unit/network/v2/fakes.py
index b931cb55..fe0422fa 100644
--- a/openstackclient/tests/unit/network/v2/fakes.py
+++ b/openstackclient/tests/unit/network/v2/fakes.py
@@ -14,6 +14,8 @@
import argparse
import copy
import mock
+from random import choice
+from random import randint
import uuid
from openstackclient.tests.unit import fakes
@@ -37,6 +39,15 @@ QUOTA = {
"l7policy": 5,
}
+RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit'
+RULE_TYPE_DSCP_MARKING = 'dscp-marking'
+RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth'
+VALID_QOS_RULES = [RULE_TYPE_BANDWIDTH_LIMIT,
+ RULE_TYPE_DSCP_MARKING,
+ RULE_TYPE_MINIMUM_BANDWIDTH]
+VALID_DSCP_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
+ 34, 36, 38, 40, 46, 48, 56]
+
class FakeNetworkV2Client(object):
@@ -662,83 +673,90 @@ class FakeNetworkRBAC(object):
return mock.Mock(side_effect=rbac_policies)
-class FakeNetworkQosBandwidthLimitRule(object):
- """Fake one or more QoS bandwidth limit rules."""
+class FakeNetworkQosPolicy(object):
+ """Fake one or more QoS policies."""
@staticmethod
- def create_one_qos_bandwidth_limit_rule(attrs=None):
- """Create a fake QoS bandwidth limit rule.
+ def create_one_qos_policy(attrs=None):
+ """Create a fake QoS policy.
:param Dictionary attrs:
A dictionary with all attributes
:return:
- A FakeResource object with id, qos_policy_id, max_kbps and
- max_burst_kbps attributes.
+ A FakeResource object with name, id, etc.
"""
attrs = attrs or {}
+ qos_id = attrs.get('id') or 'qos-policy-id-' + uuid.uuid4().hex
+ rule_attrs = {'qos_policy_id': qos_id}
+ rules = [FakeNetworkQosRule.create_one_qos_rule(rule_attrs)]
# Set default attributes.
- qos_bandwidth_limit_rule_attrs = {
- 'id': 'qos-bandwidth-limit-rule-id-' + uuid.uuid4().hex,
- 'qos_policy_id': 'qos-policy-id-' + uuid.uuid4().hex,
- 'max_kbps': 1500,
- 'max_burst_kbps': 1200,
+ qos_policy_attrs = {
+ 'name': 'qos-policy-name-' + uuid.uuid4().hex,
+ 'id': qos_id,
+ 'tenant_id': 'project-id-' + uuid.uuid4().hex,
+ 'shared': False,
+ 'description': 'qos-policy-description-' + uuid.uuid4().hex,
+ 'rules': rules,
}
# Overwrite default attributes.
- qos_bandwidth_limit_rule_attrs.update(attrs)
+ qos_policy_attrs.update(attrs)
- qos_bandwidth_limit_rule = fakes.FakeResource(
- info=copy.deepcopy(qos_bandwidth_limit_rule_attrs),
+ qos_policy = fakes.FakeResource(
+ info=copy.deepcopy(qos_policy_attrs),
loaded=True)
- return qos_bandwidth_limit_rule
+ # Set attributes with special mapping in OpenStack SDK.
+ qos_policy.is_shared = qos_policy_attrs['shared']
+ qos_policy.project_id = qos_policy_attrs['tenant_id']
+
+ return qos_policy
@staticmethod
- def create_qos_bandwidth_limit_rules(attrs=None, count=2):
- """Create multiple fake QoS bandwidth limit rules.
+ def create_qos_policies(attrs=None, count=2):
+ """Create multiple fake QoS policies.
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
- The number of QoS bandwidth limit rules to fake
+ The number of QoS policies to fake
:return:
- A list of FakeResource objects faking the QoS bandwidth limit rules
+ A list of FakeResource objects faking the QoS policies
"""
qos_policies = []
for i in range(0, count):
- qos_policies.append(FakeNetworkQosBandwidthLimitRule.
- create_one_qos_bandwidth_limit_rule(attrs))
+ qos_policies.append(
+ FakeNetworkQosPolicy.create_one_qos_policy(attrs))
return qos_policies
@staticmethod
- def get_qos_bandwidth_limit_rules(qos_rules=None, count=2):
- """Get a list of faked QoS bandwidth limit rules.
+ def get_qos_policies(qos_policies=None, count=2):
+ """Get an iterable MagicMock object with a list of faked QoS policies.
- If QoS bandwidth limit rules list is provided, then initialize the
- Mock object with the list. Otherwise create one.
+ If qos policies list is provided, then initialize the Mock object
+ with the list. Otherwise create one.
:param List address scopes:
- A list of FakeResource objects faking QoS bandwidth limit rules
+ A list of FakeResource objects faking qos policies
:param int count:
- The number of QoS bandwidth limit rules to fake
+ The number of QoS policies to fake
:return:
An iterable Mock object with side_effect set to a list of faked
- qos bandwidth limit rules
+ QoS policies
"""
- if qos_rules is None:
- qos_rules = (FakeNetworkQosBandwidthLimitRule.
- create_qos_bandwidth_limit_rules(count))
- return mock.Mock(side_effect=qos_rules)
+ if qos_policies is None:
+ qos_policies = FakeNetworkQosPolicy.create_qos_policies(count)
+ return mock.Mock(side_effect=qos_policies)
-class FakeNetworkQosPolicy(object):
- """Fake one or more QoS policies."""
+class FakeNetworkQosRule(object):
+ """Fake one or more Network QoS rules."""
@staticmethod
- def create_one_qos_policy(attrs=None):
- """Create a fake QoS policy.
+ def create_one_qos_rule(attrs=None):
+ """Create a fake Network QoS rule.
:param Dictionary attrs:
A dictionary with all attributes
@@ -746,71 +764,69 @@ class FakeNetworkQosPolicy(object):
A FakeResource object with name, id, etc.
"""
attrs = attrs or {}
- qos_id = attrs.get('id') or 'qos-policy-id-' + uuid.uuid4().hex
- rule_attrs = {'qos_policy_id': qos_id}
- rules = [
- FakeNetworkQosBandwidthLimitRule.
- create_one_qos_bandwidth_limit_rule(rule_attrs)]
# Set default attributes.
- qos_policy_attrs = {
- 'name': 'qos-policy-name-' + uuid.uuid4().hex,
- 'id': qos_id,
+ type = attrs.get('type') or choice(VALID_QOS_RULES)
+ qos_rule_attrs = {
+ 'id': 'qos-rule-id-' + uuid.uuid4().hex,
+ 'qos_policy_id': 'qos-policy-id-' + uuid.uuid4().hex,
'tenant_id': 'project-id-' + uuid.uuid4().hex,
- 'shared': False,
- 'description': 'qos-policy-description-' + uuid.uuid4().hex,
- 'rules': rules,
+ 'type': type,
}
+ if type == RULE_TYPE_BANDWIDTH_LIMIT:
+ qos_rule_attrs['max_kbps'] = randint(1, 10000)
+ qos_rule_attrs['max_burst_kbits'] = randint(1, 10000)
+ elif type == RULE_TYPE_DSCP_MARKING:
+ qos_rule_attrs['dscp_mark'] = choice(VALID_DSCP_MARKS)
+ elif type == RULE_TYPE_MINIMUM_BANDWIDTH:
+ qos_rule_attrs['min_kbps'] = randint(1, 10000)
+ qos_rule_attrs['direction'] = 'egress'
# Overwrite default attributes.
- qos_policy_attrs.update(attrs)
+ qos_rule_attrs.update(attrs)
- qos_policy = fakes.FakeResource(
- info=copy.deepcopy(qos_policy_attrs),
- loaded=True)
+ qos_rule = fakes.FakeResource(info=copy.deepcopy(qos_rule_attrs),
+ loaded=True)
# Set attributes with special mapping in OpenStack SDK.
- qos_policy.is_shared = qos_policy_attrs['shared']
- qos_policy.project_id = qos_policy_attrs['tenant_id']
+ qos_rule.project_id = qos_rule['tenant_id']
- return qos_policy
+ return qos_rule
@staticmethod
- def create_qos_policies(attrs=None, count=2):
- """Create multiple fake QoS policies.
+ def create_qos_rules(attrs=None, count=2):
+ """Create multiple fake Network QoS rules.
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
- The number of QoS policies to fake
+ The number of Network QoS rule to fake
:return:
- A list of FakeResource objects faking the QoS policies
+ A list of FakeResource objects faking the Network QoS rules
"""
- qos_policies = []
+ qos_rules = []
for i in range(0, count):
- qos_policies.append(
- FakeNetworkQosPolicy.create_one_qos_policy(attrs))
-
- return qos_policies
+ qos_rules.append(FakeNetworkQosRule.create_one_qos_rule(attrs))
+ return qos_rules
@staticmethod
- def get_qos_policies(qos_policies=None, count=2):
- """Get an iterable MagicMock object with a list of faked QoS policies.
+ def get_qos_rules(qos_rules=None, count=2):
+ """Get a list of faked Network QoS rules.
- If qos policies list is provided, then initialize the Mock object
- with the list. Otherwise create one.
+ If Network QoS rules list is provided, then initialize the Mock
+ object with the list. Otherwise create one.
:param List address scopes:
- A list of FakeResource objects faking qos policies
+ A list of FakeResource objects faking Network QoS rules
:param int count:
- The number of QoS policies to fake
+ The number of QoS minimum bandwidth rules to fake
:return:
An iterable Mock object with side_effect set to a list of faked
- QoS policies
+ qos minimum bandwidth rules
"""
- if qos_policies is None:
- qos_policies = FakeNetworkQosPolicy.create_qos_policies(count)
- return mock.Mock(side_effect=qos_policies)
+ if qos_rules is None:
+ qos_rules = (FakeNetworkQosRule.create_qos_rules(count))
+ return mock.Mock(side_effect=qos_rules)
class FakeNetworkQosRuleType(object):
diff --git a/openstackclient/tests/unit/network/v2/test_network_qos_rule.py b/openstackclient/tests/unit/network/v2/test_network_qos_rule.py
new file mode 100644
index 00000000..e66f25b7
--- /dev/null
+++ b/openstackclient/tests/unit/network/v2/test_network_qos_rule.py
@@ -0,0 +1,997 @@
+# Copyright (c) 2016, Intel Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from osc_lib import exceptions
+
+from openstackclient.network.v2 import network_qos_rule
+from openstackclient.tests.unit.network.v2 import fakes as network_fakes
+from openstackclient.tests.unit import utils as tests_utils
+
+
+RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit'
+RULE_TYPE_DSCP_MARKING = 'dscp-marking'
+RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth'
+DSCP_VALID_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
+ 34, 36, 38, 40, 46, 48, 56]
+
+
+class TestNetworkQosRule(network_fakes.TestNetworkV2):
+
+ def setUp(self):
+ super(TestNetworkQosRule, self).setUp()
+ # Get a shortcut to the network client
+ self.network = self.app.client_manager.network
+ self.qos_policy = (network_fakes.FakeNetworkQosPolicy.
+ create_one_qos_policy())
+ self.network.find_qos_policy = mock.Mock(return_value=self.qos_policy)
+
+
+class TestCreateNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
+
+ def test_check_type_parameters(self):
+ pass
+
+ def setUp(self):
+ super(TestCreateNetworkQosRuleMinimumBandwidth, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_BANDWIDTH}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.columns = (
+ 'direction',
+ 'id',
+ 'min_kbps',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+
+ self.data = (
+ self.new_rule.direction,
+ self.new_rule.id,
+ self.new_rule.min_kbps,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+ self.network.create_qos_minimum_bandwidth_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.CreateNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ '--type', RULE_TYPE_MINIMUM_BANDWIDTH,
+ '--min-kbps', str(self.new_rule.min_kbps),
+ '--egress',
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_MINIMUM_BANDWIDTH),
+ ('min_kbps', self.new_rule.min_kbps),
+ ('egress', True),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_qos_minimum_bandwidth_rule.assert_called_once_with(
+ self.qos_policy.id,
+ **{'min_kbps': self.new_rule.min_kbps,
+ 'direction': self.new_rule.direction}
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_wrong_options(self):
+ arglist = [
+ '--type', RULE_TYPE_MINIMUM_BANDWIDTH,
+ '--max-kbps', '10000',
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_MINIMUM_BANDWIDTH),
+ ('max_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('"Create" rule command for type "minimum-bandwidth" '
+ 'requires arguments min_kbps, direction')
+ self.assertEqual(msg, str(e))
+
+
+class TestCreateNetworkQosRuleDSCPMarking(TestNetworkQosRule):
+
+ def test_check_type_parameters(self):
+ pass
+
+ def setUp(self):
+ super(TestCreateNetworkQosRuleDSCPMarking, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_DSCP_MARKING}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.columns = (
+ 'dscp_mark',
+ 'id',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+
+ self.data = (
+ self.new_rule.dscp_mark,
+ self.new_rule.id,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+ self.network.create_qos_dscp_marking_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.CreateNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ '--type', RULE_TYPE_DSCP_MARKING,
+ '--dscp-mark', str(self.new_rule.dscp_mark),
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_DSCP_MARKING),
+ ('dscp_mark', self.new_rule.dscp_mark),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.create_qos_dscp_marking_rule.assert_called_once_with(
+ self.qos_policy.id,
+ **{'dscp_mark': self.new_rule.dscp_mark}
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_wrong_options(self):
+ arglist = [
+ '--type', RULE_TYPE_DSCP_MARKING,
+ '--max-kbps', '10000',
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_DSCP_MARKING),
+ ('max_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('"Create" rule command for type "dscp-marking" '
+ 'requires arguments dscp_mark')
+ self.assertEqual(msg, str(e))
+
+
+class TestCreateNetworkQosRuleBandwidtLimit(TestNetworkQosRule):
+
+ def test_check_type_parameters(self):
+ pass
+
+ def setUp(self):
+ super(TestCreateNetworkQosRuleBandwidtLimit, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_BANDWIDTH_LIMIT}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.columns = (
+ 'id',
+ 'max_burst_kbits',
+ 'max_kbps',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+
+ self.data = (
+ self.new_rule.id,
+ self.new_rule.max_burst_kbits,
+ self.new_rule.max_kbps,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+ self.network.create_qos_bandwidth_limit_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.CreateNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_create_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_create_default_options(self):
+ arglist = [
+ '--type', RULE_TYPE_BANDWIDTH_LIMIT,
+ '--max-kbps', str(self.new_rule.max_kbps),
+ '--max-burst-kbits', str(self.new_rule.max_burst_kbits),
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_BANDWIDTH_LIMIT),
+ ('max_kbps', self.new_rule.max_kbps),
+ ('max_burst_kbits', self.new_rule.max_burst_kbits),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = (self.cmd.take_action(parsed_args))
+
+ self.network.create_qos_bandwidth_limit_rule.assert_called_once_with(
+ self.qos_policy.id,
+ **{'max_kbps': self.new_rule.max_kbps,
+ 'max_burst_kbps': self.new_rule.max_burst_kbits}
+ )
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(self.data, data)
+
+ def test_create_wrong_options(self):
+ arglist = [
+ '--type', RULE_TYPE_BANDWIDTH_LIMIT,
+ '--min-kbps', '10000',
+ self.new_rule.qos_policy_id,
+ ]
+
+ verifylist = [
+ ('type', RULE_TYPE_BANDWIDTH_LIMIT),
+ ('min_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('"Create" rule command for type "bandwidth-limit" '
+ 'requires arguments max_kbps, max_burst_kbps')
+ self.assertEqual(msg, str(e))
+
+
+class TestDeleteNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestDeleteNetworkQosRuleMinimumBandwidth, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_BANDWIDTH}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.delete_qos_minimum_bandwidth_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_minimum_bandwidth_rule = (
+ network_fakes.FakeNetworkQosRule.get_qos_rules(
+ qos_rules=self.new_rule)
+ )
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_qos_policy_delete(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_qos_policy.assert_called_once_with(
+ self.qos_policy.id, ignore_missing=False)
+ self.network.delete_qos_minimum_bandwidth_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_qos_policy_delete_error(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ self.network.delete_qos_minimum_bandwidth_rule.side_effect = \
+ Exception('Error message')
+ try:
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' %
+ {'rule': self.new_rule.id, 'e': 'Error message'})
+ self.assertEqual(msg, str(e))
+
+
+class TestDeleteNetworkQosRuleDSCPMarking(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestDeleteNetworkQosRuleDSCPMarking, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_DSCP_MARKING}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.delete_qos_dscp_marking_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_dscp_marking_rule = (
+ network_fakes.FakeNetworkQosRule.get_qos_rules(
+ qos_rules=self.new_rule)
+ )
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_qos_policy_delete(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_qos_policy.assert_called_once_with(
+ self.qos_policy.id, ignore_missing=False)
+ self.network.delete_qos_dscp_marking_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_qos_policy_delete_error(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ self.network.delete_qos_dscp_marking_rule.side_effect = \
+ Exception('Error message')
+ try:
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' %
+ {'rule': self.new_rule.id, 'e': 'Error message'})
+ self.assertEqual(msg, str(e))
+
+
+class TestDeleteNetworkQosRuleBandwidthLimit(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestDeleteNetworkQosRuleBandwidthLimit, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_BANDWIDTH_LIMIT}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.delete_qos_bandwidth_limit_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_bandwidth_limit_rule = (
+ network_fakes.FakeNetworkQosRule.get_qos_rules(
+ qos_rules=self.new_rule)
+ )
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_qos_policy_delete(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+ self.network.find_qos_policy.assert_called_once_with(
+ self.qos_policy.id, ignore_missing=False)
+ self.network.delete_qos_bandwidth_limit_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_qos_policy_delete_error(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ self.network.delete_qos_bandwidth_limit_rule.side_effect = \
+ Exception('Error message')
+ try:
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' %
+ {'rule': self.new_rule.id, 'e': 'Error message'})
+ self.assertEqual(msg, str(e))
+
+
+class TestSetNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestSetNetworkQosRuleMinimumBandwidth, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_BANDWIDTH}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs=attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.update_qos_minimum_bandwidth_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_minimum_bandwidth_rule = mock.Mock(
+ return_value=self.new_rule)
+ self.network.find_qos_policy = mock.Mock(
+ return_value=self.qos_policy)
+
+ # Get the command object to test
+ self.cmd = (network_qos_rule.SetNetworkQosRule(self.app,
+ self.namespace))
+
+ def test_set_nothing(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.update_qos_minimum_bandwidth_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_set_min_kbps(self):
+ arglist = [
+ '--min-kbps', str(self.new_rule.min_kbps),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('min_kbps', self.new_rule.min_kbps),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'min_kbps': self.new_rule.min_kbps,
+ }
+ self.network.update_qos_minimum_bandwidth_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_wrong_options(self):
+ arglist = [
+ '--max-kbps', str(10000),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('max_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type '
+ '"minimum-bandwidth" only requires arguments min_kbps, '
+ 'direction' % {'rule': self.new_rule.id})
+ self.assertEqual(msg, str(e))
+
+
+class TestSetNetworkQosRuleDSCPMarking(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestSetNetworkQosRuleDSCPMarking, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_DSCP_MARKING}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs=attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.update_qos_dscp_marking_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_dscp_marking_rule = mock.Mock(
+ return_value=self.new_rule)
+ self.network.find_qos_policy = mock.Mock(
+ return_value=self.qos_policy)
+
+ # Get the command object to test
+ self.cmd = (network_qos_rule.SetNetworkQosRule(self.app,
+ self.namespace))
+
+ def test_set_nothing(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.update_qos_dscp_marking_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_set_dscp_mark(self):
+ arglist = [
+ '--dscp-mark', str(self.new_rule.dscp_mark),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('dscp_mark', self.new_rule.dscp_mark),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'dscp_mark': self.new_rule.dscp_mark,
+ }
+ self.network.update_qos_dscp_marking_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_wrong_options(self):
+ arglist = [
+ '--max-kbps', str(10000),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('max_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type '
+ '"dscp-marking" only requires arguments dscp_mark' %
+ {'rule': self.new_rule.id})
+ self.assertEqual(msg, str(e))
+
+
+class TestSetNetworkQosRuleBandwidthLimit(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestSetNetworkQosRuleBandwidthLimit, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_BANDWIDTH_LIMIT}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs=attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.network.update_qos_bandwidth_limit_rule = mock.Mock(
+ return_value=None)
+ self.network.find_qos_bandwidth_limit_rule = mock.Mock(
+ return_value=self.new_rule)
+ self.network.find_qos_policy = mock.Mock(
+ return_value=self.qos_policy)
+
+ # Get the command object to test
+ self.cmd = (network_qos_rule.SetNetworkQosRule(self.app,
+ self.namespace))
+
+ def test_set_nothing(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ self.network.update_qos_bandwidth_limit_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id)
+ self.assertIsNone(result)
+
+ def test_set_max_kbps(self):
+ arglist = [
+ '--max-kbps', str(self.new_rule.max_kbps),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('max_kbps', self.new_rule.max_kbps),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'max_kbps': self.new_rule.max_kbps,
+ }
+ self.network.update_qos_bandwidth_limit_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_max_burst_kbits(self):
+ arglist = [
+ '--max-burst-kbits', str(self.new_rule.max_burst_kbits),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('max_burst_kbits', self.new_rule.max_burst_kbits),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ result = self.cmd.take_action(parsed_args)
+
+ attrs = {
+ 'max_burst_kbps': self.new_rule.max_burst_kbits,
+ }
+ self.network.update_qos_bandwidth_limit_rule.assert_called_with(
+ self.new_rule, self.qos_policy.id, **attrs)
+ self.assertIsNone(result)
+
+ def test_set_wrong_options(self):
+ arglist = [
+ '--min-kbps', str(10000),
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('min_kbps', 10000),
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ try:
+ self.cmd.take_action(parsed_args)
+ except exceptions.CommandError as e:
+ msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type '
+ '"bandwidth-limit" only requires arguments max_kbps, '
+ 'max_burst_kbps' % {'rule': self.new_rule.id})
+ self.assertEqual(msg, str(e))
+
+
+class TestListNetworkQosRule(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestListNetworkQosRule, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_BANDWIDTH}
+ self.new_rule_min_bw = (network_fakes.FakeNetworkQosRule.
+ create_one_qos_rule(attrs=attrs))
+ attrs['type'] = RULE_TYPE_DSCP_MARKING
+ self.new_rule_dscp_mark = (network_fakes.FakeNetworkQosRule.
+ create_one_qos_rule(attrs=attrs))
+ attrs['type'] = RULE_TYPE_BANDWIDTH_LIMIT
+ self.new_rule_max_bw = (network_fakes.FakeNetworkQosRule.
+ create_one_qos_rule(attrs=attrs))
+ self.qos_policy.rules = [self.new_rule_min_bw,
+ self.new_rule_dscp_mark,
+ self.new_rule_max_bw]
+ self.network.find_qos_minimum_bandwidth_rule = mock.Mock(
+ return_value=self.new_rule_min_bw)
+ self.network.find_qos_dscp_marking_rule = mock.Mock(
+ return_value=self.new_rule_dscp_mark)
+ self.network.find_qos_bandwidth_limit_rule = mock.Mock(
+ return_value=self.new_rule_max_bw)
+ self.columns = (
+ 'ID',
+ 'QoS Policy ID',
+ 'Type',
+ 'Max Kbps',
+ 'Max Burst Kbits',
+ 'Min Kbps',
+ 'DSCP mark',
+ 'Direction',
+ )
+ self.data = []
+ for index in range(len(self.qos_policy.rules)):
+ self.data.append((
+ self.qos_policy.rules[index].id,
+ self.qos_policy.rules[index].qos_policy_id,
+ self.qos_policy.rules[index].type,
+ getattr(self.qos_policy.rules[index], 'max_kbps', ''),
+ getattr(self.qos_policy.rules[index], 'max_burst_kbps', ''),
+ getattr(self.qos_policy.rules[index], 'min_kbps', ''),
+ getattr(self.qos_policy.rules[index], 'dscp_mark', ''),
+ getattr(self.qos_policy.rules[index], 'direction', ''),
+ ))
+ # Get the command object to test
+ self.cmd = network_qos_rule.ListNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_qos_rule_list(self):
+ arglist = [
+ self.qos_policy.id
+ ]
+ verifylist = [
+ ('qos_policy', self.qos_policy.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.find_qos_policy.assert_called_once_with(
+ self.qos_policy.id, ignore_missing=False)
+ self.assertEqual(self.columns, columns)
+ list_data = list(data)
+ self.assertEqual(len(self.data), len(list_data))
+ for index in range(len(list_data)):
+ self.assertEqual(self.data[index], list_data[index])
+
+
+class TestShowNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestShowNetworkQosRuleMinimumBandwidth, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_MINIMUM_BANDWIDTH}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.columns = (
+ 'direction',
+ 'id',
+ 'min_kbps',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+ self.data = (
+ self.new_rule.direction,
+ self.new_rule.id,
+ self.new_rule.min_kbps,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+
+ self.network.get_qos_minimum_bandwidth_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.ShowNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.get_qos_minimum_bandwidth_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(list(self.data), list(data))
+
+
+class TestShowNetworkQosDSCPMarking(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestShowNetworkQosDSCPMarking, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_DSCP_MARKING}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.columns = (
+ 'dscp_mark',
+ 'id',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+ self.data = (
+ self.new_rule.dscp_mark,
+ self.new_rule.id,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+
+ self.network.get_qos_dscp_marking_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.ShowNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.get_qos_dscp_marking_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(list(self.data), list(data))
+
+
+class TestShowNetworkQosBandwidthLimit(TestNetworkQosRule):
+
+ def setUp(self):
+ super(TestShowNetworkQosBandwidthLimit, self).setUp()
+ attrs = {'qos_policy_id': self.qos_policy.id,
+ 'type': RULE_TYPE_BANDWIDTH_LIMIT}
+ self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
+ attrs)
+ self.qos_policy.rules = [self.new_rule]
+ self.columns = (
+ 'id',
+ 'max_burst_kbits',
+ 'max_kbps',
+ 'project_id',
+ 'qos_policy_id',
+ 'type'
+ )
+ self.data = (
+ self.new_rule.id,
+ self.new_rule.max_burst_kbits,
+ self.new_rule.max_kbps,
+ self.new_rule.project_id,
+ self.new_rule.qos_policy_id,
+ self.new_rule.type,
+ )
+
+ self.network.get_qos_bandwidth_limit_rule = mock.Mock(
+ return_value=self.new_rule)
+
+ # Get the command object to test
+ self.cmd = network_qos_rule.ShowNetworkQosRule(self.app,
+ self.namespace)
+
+ def test_show_no_options(self):
+ arglist = []
+ verifylist = []
+
+ # Missing required args should bail here
+ self.assertRaises(tests_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_show_all_options(self):
+ arglist = [
+ self.new_rule.qos_policy_id,
+ self.new_rule.id,
+ ]
+ verifylist = [
+ ('qos_policy', self.new_rule.qos_policy_id),
+ ('id', self.new_rule.id),
+ ]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+ columns, data = self.cmd.take_action(parsed_args)
+
+ self.network.get_qos_bandwidth_limit_rule.assert_called_once_with(
+ self.new_rule.id, self.qos_policy.id)
+ self.assertEqual(self.columns, columns)
+ self.assertEqual(list(self.data), list(data))