summaryrefslogtreecommitdiff
path: root/wsme/types.py
diff options
context:
space:
mode:
Diffstat (limited to 'wsme/types.py')
-rw-r--r--wsme/types.py840
1 files changed, 0 insertions, 840 deletions
diff --git a/wsme/types.py b/wsme/types.py
deleted file mode 100644
index 0a5fc02..0000000
--- a/wsme/types.py
+++ /dev/null
@@ -1,840 +0,0 @@
-import base64
-import datetime
-import decimal
-import inspect
-import logging
-import netaddr
-import re
-import six
-import sys
-import uuid
-import weakref
-
-from wsme import exc
-
-log = logging.getLogger(__name__)
-
-#: The 'str' (python 2) or 'bytes' (python 3) type.
-#: Its use should be restricted to
-#: pure ascii strings as the protocols will generally not be
-#: be able to send non-unicode strings.
-#: To transmit binary strings, use the :class:`binary` type
-bytes = six.binary_type
-
-#: Unicode string.
-text = six.text_type
-
-
-class ArrayType(object):
- def __init__(self, item_type):
- if iscomplex(item_type):
- self._item_type = weakref.ref(item_type)
- else:
- self._item_type = item_type
-
- def __hash__(self):
- return hash(self.item_type)
-
- def __eq__(self, other):
- return isinstance(other, ArrayType) \
- and self.item_type == other.item_type
-
- def sample(self):
- return [getattr(self.item_type, 'sample', self.item_type)()]
-
- @property
- def item_type(self):
- if isinstance(self._item_type, weakref.ref):
- return self._item_type()
- else:
- return self._item_type
-
- def validate(self, value):
- if value is None:
- return
- if not isinstance(value, list):
- raise ValueError("Wrong type. Expected '[%s]', got '%s'" % (
- self.item_type, type(value)
- ))
- return [
- validate_value(self.item_type, item)
- for item in value
- ]
-
-
-class DictType(object):
- def __init__(self, key_type, value_type):
- if key_type not in pod_types:
- raise ValueError("Dictionaries key can only be a pod type")
- self.key_type = key_type
- if iscomplex(value_type):
- self._value_type = weakref.ref(value_type)
- else:
- self._value_type = value_type
-
- def __hash__(self):
- return hash((self.key_type, self.value_type))
-
- def sample(self):
- key = getattr(self.key_type, 'sample', self.key_type)()
- value = getattr(self.value_type, 'sample', self.value_type)()
- return {key: value}
-
- @property
- def value_type(self):
- if isinstance(self._value_type, weakref.ref):
- return self._value_type()
- else:
- return self._value_type
-
- def validate(self, value):
- if not isinstance(value, dict):
- raise ValueError("Wrong type. Expected '{%s: %s}', got '%s'" % (
- self.key_type, self.value_type, type(value)
- ))
- return dict((
- (
- validate_value(self.key_type, key),
- validate_value(self.value_type, v)
- ) for key, v in value.items()
- ))
-
-
-class UserType(object):
- basetype = None
- name = None
-
- def validate(self, value):
- return value
-
- def tobasetype(self, value):
- return value
-
- def frombasetype(self, value):
- return value
-
-
-def isusertype(class_):
- return isinstance(class_, UserType)
-
-
-class BinaryType(UserType):
- """
- A user type that use base64 strings to carry binary data.
- """
- basetype = bytes
- name = 'binary'
-
- def tobasetype(self, value):
- if value is None:
- return None
- return base64.encodestring(value)
-
- def frombasetype(self, value):
- if value is None:
- return None
- return base64.decodestring(value)
-
-
-#: The binary almost-native type
-binary = BinaryType()
-
-
-class IntegerType(UserType):
- """
- A simple integer type. Can validate a value range.
-
- :param minimum: Possible minimum value
- :param maximum: Possible maximum value
-
- Example::
-
- Price = IntegerType(minimum=1)
-
- """
- basetype = int
- name = "integer"
-
- def __init__(self, minimum=None, maximum=None):
- self.minimum = minimum
- self.maximum = maximum
-
- @staticmethod
- def frombasetype(value):
- return int(value) if value is not None else None
-
- def validate(self, value):
- if self.minimum is not None and value < self.minimum:
- error = 'Value should be greater or equal to %s' % self.minimum
- raise ValueError(error)
-
- if self.maximum is not None and value > self.maximum:
- error = 'Value should be lower or equal to %s' % self.maximum
- raise ValueError(error)
-
- return value
-
-
-class StringType(UserType):
- """
- A simple string type. Can validate a length and a pattern.
-
- :param min_length: Possible minimum length
- :param max_length: Possible maximum length
- :param pattern: Possible string pattern
-
- Example::
-
- Name = StringType(min_length=1, pattern='^[a-zA-Z ]*$')
-
- """
- basetype = six.string_types
- name = "string"
-
- def __init__(self, min_length=None, max_length=None, pattern=None):
- self.min_length = min_length
- self.max_length = max_length
- if isinstance(pattern, six.string_types):
- self.pattern = re.compile(pattern)
- else:
- self.pattern = pattern
-
- def validate(self, value):
- if not isinstance(value, self.basetype):
- error = 'Value should be string'
- raise ValueError(error)
-
- if self.min_length is not None and len(value) < self.min_length:
- error = 'Value should have a minimum character requirement of %s' \
- % self.min_length
- raise ValueError(error)
-
- if self.max_length is not None and len(value) > self.max_length:
- error = 'Value should have a maximum character requirement of %s' \
- % self.max_length
- raise ValueError(error)
-
- if self.pattern is not None and not self.pattern.search(value):
- error = 'Value should match the pattern %s' % self.pattern.pattern
- raise ValueError(error)
-
- return value
-
-
-class IPv4AddressType(UserType):
- """
- A simple IPv4 type.
- """
- basetype = six.string_types
- name = "ipv4address"
-
- @staticmethod
- def validate(value):
- try:
- netaddr.IPAddress(value, version=4, flags=netaddr.INET_PTON)
- except netaddr.AddrFormatError:
- error = 'Value should be IPv4 format'
- raise ValueError(error)
- else:
- return value
-
-
-class IPv6AddressType(UserType):
- """
- A simple IPv6 type.
-
- This type represents IPv6 addresses in the short format.
- """
- basetype = six.string_types
- name = "ipv6address"
-
- @staticmethod
- def validate(value):
- try:
- netaddr.IPAddress(value, version=6, flags=netaddr.INET_PTON)
- except netaddr.AddrFormatError:
- error = 'Value should be IPv6 format'
- raise ValueError(error)
- else:
- return value
-
-
-class UuidType(UserType):
- """
- A simple UUID type.
-
- This type allows not only UUID having dashes but also UUID not
- having dashes. For example, '6a0a707c-45ef-4758-b533-e55adddba8ce'
- and '6a0a707c45ef4758b533e55adddba8ce' are distinguished as valid.
- """
- basetype = six.string_types
- name = "uuid"
-
- @staticmethod
- def validate(value):
- try:
- return six.text_type((uuid.UUID(value)))
- except (TypeError, ValueError, AttributeError):
- error = 'Value should be UUID format'
- raise ValueError(error)
-
-
-class Enum(UserType):
- """
- A simple enumeration type. Can be based on any non-complex type.
-
- :param basetype: The actual data type
- :param values: A set of possible values
-
- If nullable, 'None' should be added the values set.
-
- Example::
-
- Gender = Enum(str, 'male', 'female')
- Specie = Enum(str, 'cat', 'dog')
-
- """
- def __init__(self, basetype, *values, **kw):
- self.basetype = basetype
- self.values = set(values)
- name = kw.pop('name', None)
- if name is None:
- name = "Enum(%s)" % ', '.join((six.text_type(v) for v in values))
- self.name = name
-
- def validate(self, value):
- if value not in self.values:
- raise ValueError("Value should be one of: %s" %
- ', '.join(map(six.text_type, self.values)))
- return value
-
- def tobasetype(self, value):
- return value
-
- def frombasetype(self, value):
- return value
-
-
-class UnsetType(object):
- if sys.version < '3':
- def __nonzero__(self):
- return False
- else:
- def __bool__(self):
- return False
-
- def __repr__(self):
- return 'Unset'
-
-
-Unset = UnsetType()
-
-#: A special type that corresponds to the host framework request object.
-#: It can only be used in the function parameters, and if so the request object
-#: of the host framework will be passed to the function.
-HostRequest = object()
-
-
-pod_types = six.integer_types + (
- bytes, text, float, bool)
-dt_types = (datetime.date, datetime.time, datetime.datetime)
-extra_types = (binary, decimal.Decimal)
-native_types = pod_types + dt_types + extra_types
-# The types for which we allow promotion to certain numbers.
-_promotable_types = six.integer_types + (text, bytes)
-
-
-def iscomplex(datatype):
- return inspect.isclass(datatype) \
- and '_wsme_attributes' in datatype.__dict__
-
-
-def isarray(datatype):
- return isinstance(datatype, ArrayType)
-
-
-def isdict(datatype):
- return isinstance(datatype, DictType)
-
-
-def validate_value(datatype, value):
- if value in (Unset, None):
- return value
-
- # Try to promote the data type to one of our complex types.
- if isinstance(datatype, list):
- datatype = ArrayType(datatype[0])
- elif isinstance(datatype, dict):
- datatype = DictType(*list(datatype.items())[0])
-
- # If the datatype has its own validator, use that.
- if hasattr(datatype, 'validate'):
- return datatype.validate(value)
-
- # Do type promotion/conversion and data validation for builtin
- # types.
- v_type = type(value)
- if datatype in six.integer_types:
- if v_type in _promotable_types:
- try:
- # Try to turn the value into an int
- value = datatype(value)
- except ValueError:
- # An error is raised at the end of the function
- # when the types don't match.
- pass
- elif datatype is float and v_type in _promotable_types:
- try:
- value = float(value)
- except ValueError:
- # An error is raised at the end of the function
- # when the types don't match.
- pass
- elif datatype is text and isinstance(value, bytes):
- value = value.decode()
- elif datatype is bytes and isinstance(value, text):
- value = value.encode()
-
- if not isinstance(value, datatype):
- raise ValueError(
- "Wrong type. Expected '%s', got '%s'" % (
- datatype, v_type
- ))
- return value
-
-
-class wsproperty(property):
- """
- A specialised :class:`property` to define typed-property on complex types.
- Example::
-
- class MyComplexType(wsme.types.Base):
- def get_aint(self):
- return self._aint
-
- def set_aint(self, value):
- assert avalue < 10 # Dummy input validation
- self._aint = value
-
- aint = wsproperty(int, get_aint, set_aint, mandatory=True)
- """
- def __init__(self, datatype, fget, fset=None,
- mandatory=False, doc=None, name=None):
- property.__init__(self, fget, fset)
- #: The property name in the parent python class
- self.key = None
- #: The attribute name on the public of the api.
- #: Defaults to :attr:`key`
- self.name = name
- #: property data type
- self.datatype = datatype
- #: True if the property is mandatory
- self.mandatory = mandatory
-
-
-class wsattr(object):
- """
- Complex type attribute definition.
-
- Example::
-
- class MyComplexType(wsme.types.Base):
- optionalvalue = int
- mandatoryvalue = wsattr(int, mandatory=True)
- named_value = wsattr(int, name='named.value')
-
- After inspection, the non-wsattr attributes will be replaced, and
- the above class will be equivalent to::
-
- class MyComplexType(wsme.types.Base):
- optionalvalue = wsattr(int)
- mandatoryvalue = wsattr(int, mandatory=True)
-
- """
- def __init__(self, datatype, mandatory=False, name=None, default=Unset,
- readonly=False):
- #: The attribute name in the parent python class.
- #: Set by :func:`inspect_class`
- self.key = None # will be set by class inspection
- #: The attribute name on the public of the api.
- #: Defaults to :attr:`key`
- self.name = name
- self._datatype = (datatype,)
- #: True if the attribute is mandatory
- self.mandatory = mandatory
- #: Default value. The attribute will return this instead
- #: of :data:`Unset` if no value has been set.
- self.default = default
- #: If True value cannot be set from json/xml input data
- self.readonly = readonly
-
- self.complextype = None
-
- def _get_dataholder(self, instance):
- dataholder = getattr(instance, '_wsme_dataholder', None)
- if dataholder is None:
- dataholder = instance._wsme_DataHolderClass()
- instance._wsme_dataholder = dataholder
- return dataholder
-
- def __get__(self, instance, owner):
- if instance is None:
- return self
- return getattr(
- self._get_dataholder(instance),
- self.key,
- self.default
- )
-
- def __set__(self, instance, value):
- try:
- value = validate_value(self.datatype, value)
- except (ValueError, TypeError) as e:
- raise exc.InvalidInput(self.name, value, six.text_type(e))
- dataholder = self._get_dataholder(instance)
- if value is Unset:
- if hasattr(dataholder, self.key):
- delattr(dataholder, self.key)
- else:
- setattr(dataholder, self.key, value)
-
- def __delete__(self, instance):
- self.__set__(instance, Unset)
-
- def _get_datatype(self):
- if isinstance(self._datatype, tuple):
- self._datatype = \
- self.complextype().__registry__.resolve_type(self._datatype[0])
- if isinstance(self._datatype, weakref.ref):
- return self._datatype()
- if isinstance(self._datatype, list):
- return [
- item() if isinstance(item, weakref.ref) else item
- for item in self._datatype
- ]
- return self._datatype
-
- def _set_datatype(self, datatype):
- self._datatype = datatype
-
- #: attribute data type. Can be either an actual type,
- #: or a type name, in which case the actual type will be
- #: determined when needed (generally just before scanning the api).
- datatype = property(_get_datatype, _set_datatype)
-
-
-def iswsattr(attr):
- if inspect.isfunction(attr) or inspect.ismethod(attr):
- return False
- if isinstance(attr, property) and not isinstance(attr, wsproperty):
- return False
- return True
-
-
-def sort_attributes(class_, attributes):
- """Sort a class attributes list.
-
- 3 mechanisms are attempted :
-
- #. Look for a _wsme_attr_order attribute on the class_. This allow
- to define an arbitrary order of the attributes (useful for
- generated types).
-
- #. Access the object source code to find the declaration order.
-
- #. Sort by alphabetically"""
-
- if not len(attributes):
- return
-
- attrs = dict((a.key, a) for a in attributes)
-
- if hasattr(class_, '_wsme_attr_order'):
- names_order = class_._wsme_attr_order
- else:
- names = attrs.keys()
- names_order = []
- try:
- lines = []
- for cls in inspect.getmro(class_):
- if cls is object:
- continue
- lines[len(lines):] = inspect.getsourcelines(cls)[0]
- for line in lines:
- line = line.strip().replace(" ", "")
- if '=' in line:
- aname = line[:line.index('=')]
- if aname in names and aname not in names_order:
- names_order.append(aname)
- if len(names_order) < len(names):
- names_order.extend((
- name for name in names if name not in names_order))
- assert len(names_order) == len(names)
- except (TypeError, IOError):
- names_order = list(names)
- names_order.sort()
-
- attributes[:] = [attrs[name] for name in names_order]
-
-
-def inspect_class(class_):
- """Extract a list of (name, wsattr|wsproperty) for the given class_"""
- attributes = []
- for name, attr in inspect.getmembers(class_, iswsattr):
- if name.startswith('_'):
- continue
- if inspect.isroutine(attr):
- continue
-
- if isinstance(attr, (wsattr, wsproperty)):
- attrdef = attr
- else:
- if attr not in native_types and (
- inspect.isclass(attr) or
- isinstance(attr, (list, dict))):
- register_type(attr)
- attrdef = getattr(class_, '__wsattrclass__', wsattr)(attr)
-
- attrdef.key = name
- if attrdef.name is None:
- attrdef.name = name
- attrdef.complextype = weakref.ref(class_)
- attributes.append(attrdef)
- setattr(class_, name, attrdef)
-
- sort_attributes(class_, attributes)
- return attributes
-
-
-def list_attributes(class_):
- """
- Returns a list of a complex type attributes.
- """
- if not iscomplex(class_):
- raise TypeError("%s is not a registered type")
- return class_._wsme_attributes
-
-
-def make_dataholder(class_):
- # the slots are computed outside the class scope to avoid
- # 'attr' to pullute the class namespace, which leads to weird
- # things if one of the slots is named 'attr'.
- slots = [attr.key for attr in class_._wsme_attributes]
-
- class DataHolder(object):
- __slots__ = slots
-
- DataHolder.__name__ = class_.__name__ + 'DataHolder'
- return DataHolder
-
-
-class Registry(object):
- def __init__(self):
- self._complex_types = []
- self.array_types = set()
- self.dict_types = set()
-
- @property
- def complex_types(self):
- return [t() for t in self._complex_types if t()]
-
- def register(self, class_):
- """
- Make sure a type is registered.
-
- It is automatically called by :class:`expose() <wsme.expose>`
- and :class:`validate() <wsme.validate>`.
- Unless you want to control when the class inspection is done there
- is no need to call it.
- """
- if class_ is None or \
- class_ in native_types or \
- isusertype(class_) or iscomplex(class_) or \
- isarray(class_) or isdict(class_):
- return class_
-
- if isinstance(class_, list):
- if len(class_) != 1:
- raise ValueError("Cannot register type %s" % repr(class_))
- dt = ArrayType(class_[0])
- self.register(dt.item_type)
- self.array_types.add(dt)
- return dt
-
- if isinstance(class_, dict):
- if len(class_) != 1:
- raise ValueError("Cannot register type %s" % repr(class_))
- dt = DictType(*list(class_.items())[0])
- self.register(dt.value_type)
- self.dict_types.add(dt)
- return dt
-
- class_._wsme_attributes = None
- class_._wsme_attributes = inspect_class(class_)
- class_._wsme_DataHolderClass = make_dataholder(class_)
-
- class_.__registry__ = self
- self._complex_types.append(weakref.ref(class_))
- return class_
-
- def reregister(self, class_):
- """Register a type which may already have been registered.
- """
- self._unregister(class_)
- return self.register(class_)
-
- def _unregister(self, class_):
- """Remove a previously registered type.
- """
- # Clear the existing attribute reference so it is rebuilt if
- # the class is registered again later.
- if hasattr(class_, '_wsme_attributes'):
- del class_._wsme_attributes
- # FIXME(dhellmann): This method does not recurse through the
- # types like register() does. Should it?
- if isinstance(class_, list):
- at = ArrayType(class_[0])
- try:
- self.array_types.remove(at)
- except KeyError:
- pass
- elif isinstance(class_, dict):
- key_type, value_type = list(class_.items())[0]
- self.dict_types = set(
- dt for dt in self.dict_types
- if (dt.key_type, dt.value_type) != (key_type, value_type)
- )
- # We can't use remove() here because the items in
- # _complex_types are weakref objects pointing to the classes,
- # so we can't compare with them directly.
- self._complex_types = [
- ct for ct in self._complex_types
- if ct() is not class_
- ]
-
- def lookup(self, typename):
- log.debug('Lookup %s' % typename)
- modname = None
- if '.' in typename:
- modname, typename = typename.rsplit('.', 1)
- for ct in self._complex_types:
- ct = ct()
- if ct is not None and typename == ct.__name__ and (
- modname is None or modname == ct.__module__):
- return ct
-
- def resolve_type(self, type_):
- if isinstance(type_, six.string_types):
- return self.lookup(type_)
- if isinstance(type_, list):
- type_ = ArrayType(type_[0])
- if isinstance(type_, dict):
- type_ = DictType(list(type_.keys())[0], list(type_.values())[0])
- if isinstance(type_, ArrayType):
- type_ = ArrayType(self.resolve_type(type_.item_type))
- self.array_types.add(type_)
- elif isinstance(type_, DictType):
- type_ = DictType(
- type_.key_type,
- self.resolve_type(type_.value_type)
- )
- self.dict_types.add(type_)
- else:
- type_ = self.register(type_)
- return type_
-
-
-# Default type registry
-registry = Registry()
-
-
-def register_type(class_):
- return registry.register(class_)
-
-
-class BaseMeta(type):
- def __new__(cls, name, bases, dct):
- if bases and bases[0] is not object and '__registry__' not in dct:
- dct['__registry__'] = registry
- return type.__new__(cls, name, bases, dct)
-
- def __init__(cls, name, bases, dct):
- if bases and bases[0] is not object and cls.__registry__:
- cls.__registry__.register(cls)
-
-
-class Base(six.with_metaclass(BaseMeta)):
- """Base type for complex types"""
- def __init__(self, **kw):
- for key, value in kw.items():
- if hasattr(self, key):
- setattr(self, key, value)
-
-
-class File(Base):
- """A complex type that represents a file.
-
- In the particular case of protocol accepting form encoded data as
- input, File can be loaded from a form file field.
- """
- #: The file name
- filename = wsattr(text)
-
- #: Mime type of the content
- contenttype = wsattr(text)
-
- def _get_content(self):
- if self._content is None and self._file:
- self._content = self._file.read()
- return self._content
-
- def _set_content(self, value):
- self._content = value
- self._file = None
-
- #: File content
- content = wsproperty(binary, _get_content, _set_content)
-
- def __init__(self, filename=None, file=None, content=None,
- contenttype=None, fieldstorage=None):
- self.filename = filename
- self.contenttype = contenttype
- self._file = file
- self._content = content
-
- if fieldstorage is not None:
- if fieldstorage.file:
- self._file = fieldstorage.file
- self.filename = fieldstorage.filename
- self.contenttype = text(fieldstorage.type)
- else:
- self._content = fieldstorage.value
-
- @property
- def file(self):
- if self._file is None and self._content:
- self._file = six.BytesIO(self._content)
- return self._file
-
-
-class DynamicBase(Base):
- """Base type for complex types for which all attributes are not
- defined when the class is constructed.
-
- This class is meant to be used as a base for types that have
- properties added after the main class is created, such as by
- loading plugins.
-
- """
-
- @classmethod
- def add_attributes(cls, **attrs):
- """Add more attributes
-
- The arguments should be valid Python attribute names
- associated with a type for the new attribute.
-
- """
- for n, t in attrs.items():
- setattr(cls, n, t)
- cls.__registry__.reregister(cls)