diff options
author | Jim Rollenhagen <jim@jimrollenhagen.com> | 2019-09-26 09:43:27 -0400 |
---|---|---|
committer | Jim Rollenhagen <jim@jimrollenhagen.com> | 2019-09-26 09:43:27 -0400 |
commit | e9c6edfe510f4ed407f8d2d84b4b931a382b48b3 (patch) | |
tree | 94bbd6a34bcf09e99f7ae1be88b19960192d6adb /wsme/types.py | |
parent | 1d73d6e50411ebc45fb96a6ed3c63ca91a500323 (diff) | |
download | wsme-master.tar.gz |
Diffstat (limited to 'wsme/types.py')
-rw-r--r-- | wsme/types.py | 840 |
1 files changed, 0 insertions, 840 deletions
diff --git a/wsme/types.py b/wsme/types.py deleted file mode 100644 index 0a5fc02..0000000 --- a/wsme/types.py +++ /dev/null @@ -1,840 +0,0 @@ -import base64 -import datetime -import decimal -import inspect -import logging -import netaddr -import re -import six -import sys -import uuid -import weakref - -from wsme import exc - -log = logging.getLogger(__name__) - -#: The 'str' (python 2) or 'bytes' (python 3) type. -#: Its use should be restricted to -#: pure ascii strings as the protocols will generally not be -#: be able to send non-unicode strings. -#: To transmit binary strings, use the :class:`binary` type -bytes = six.binary_type - -#: Unicode string. -text = six.text_type - - -class ArrayType(object): - def __init__(self, item_type): - if iscomplex(item_type): - self._item_type = weakref.ref(item_type) - else: - self._item_type = item_type - - def __hash__(self): - return hash(self.item_type) - - def __eq__(self, other): - return isinstance(other, ArrayType) \ - and self.item_type == other.item_type - - def sample(self): - return [getattr(self.item_type, 'sample', self.item_type)()] - - @property - def item_type(self): - if isinstance(self._item_type, weakref.ref): - return self._item_type() - else: - return self._item_type - - def validate(self, value): - if value is None: - return - if not isinstance(value, list): - raise ValueError("Wrong type. Expected '[%s]', got '%s'" % ( - self.item_type, type(value) - )) - return [ - validate_value(self.item_type, item) - for item in value - ] - - -class DictType(object): - def __init__(self, key_type, value_type): - if key_type not in pod_types: - raise ValueError("Dictionaries key can only be a pod type") - self.key_type = key_type - if iscomplex(value_type): - self._value_type = weakref.ref(value_type) - else: - self._value_type = value_type - - def __hash__(self): - return hash((self.key_type, self.value_type)) - - def sample(self): - key = getattr(self.key_type, 'sample', self.key_type)() - value = getattr(self.value_type, 'sample', self.value_type)() - return {key: value} - - @property - def value_type(self): - if isinstance(self._value_type, weakref.ref): - return self._value_type() - else: - return self._value_type - - def validate(self, value): - if not isinstance(value, dict): - raise ValueError("Wrong type. Expected '{%s: %s}', got '%s'" % ( - self.key_type, self.value_type, type(value) - )) - return dict(( - ( - validate_value(self.key_type, key), - validate_value(self.value_type, v) - ) for key, v in value.items() - )) - - -class UserType(object): - basetype = None - name = None - - def validate(self, value): - return value - - def tobasetype(self, value): - return value - - def frombasetype(self, value): - return value - - -def isusertype(class_): - return isinstance(class_, UserType) - - -class BinaryType(UserType): - """ - A user type that use base64 strings to carry binary data. - """ - basetype = bytes - name = 'binary' - - def tobasetype(self, value): - if value is None: - return None - return base64.encodestring(value) - - def frombasetype(self, value): - if value is None: - return None - return base64.decodestring(value) - - -#: The binary almost-native type -binary = BinaryType() - - -class IntegerType(UserType): - """ - A simple integer type. Can validate a value range. - - :param minimum: Possible minimum value - :param maximum: Possible maximum value - - Example:: - - Price = IntegerType(minimum=1) - - """ - basetype = int - name = "integer" - - def __init__(self, minimum=None, maximum=None): - self.minimum = minimum - self.maximum = maximum - - @staticmethod - def frombasetype(value): - return int(value) if value is not None else None - - def validate(self, value): - if self.minimum is not None and value < self.minimum: - error = 'Value should be greater or equal to %s' % self.minimum - raise ValueError(error) - - if self.maximum is not None and value > self.maximum: - error = 'Value should be lower or equal to %s' % self.maximum - raise ValueError(error) - - return value - - -class StringType(UserType): - """ - A simple string type. Can validate a length and a pattern. - - :param min_length: Possible minimum length - :param max_length: Possible maximum length - :param pattern: Possible string pattern - - Example:: - - Name = StringType(min_length=1, pattern='^[a-zA-Z ]*$') - - """ - basetype = six.string_types - name = "string" - - def __init__(self, min_length=None, max_length=None, pattern=None): - self.min_length = min_length - self.max_length = max_length - if isinstance(pattern, six.string_types): - self.pattern = re.compile(pattern) - else: - self.pattern = pattern - - def validate(self, value): - if not isinstance(value, self.basetype): - error = 'Value should be string' - raise ValueError(error) - - if self.min_length is not None and len(value) < self.min_length: - error = 'Value should have a minimum character requirement of %s' \ - % self.min_length - raise ValueError(error) - - if self.max_length is not None and len(value) > self.max_length: - error = 'Value should have a maximum character requirement of %s' \ - % self.max_length - raise ValueError(error) - - if self.pattern is not None and not self.pattern.search(value): - error = 'Value should match the pattern %s' % self.pattern.pattern - raise ValueError(error) - - return value - - -class IPv4AddressType(UserType): - """ - A simple IPv4 type. - """ - basetype = six.string_types - name = "ipv4address" - - @staticmethod - def validate(value): - try: - netaddr.IPAddress(value, version=4, flags=netaddr.INET_PTON) - except netaddr.AddrFormatError: - error = 'Value should be IPv4 format' - raise ValueError(error) - else: - return value - - -class IPv6AddressType(UserType): - """ - A simple IPv6 type. - - This type represents IPv6 addresses in the short format. - """ - basetype = six.string_types - name = "ipv6address" - - @staticmethod - def validate(value): - try: - netaddr.IPAddress(value, version=6, flags=netaddr.INET_PTON) - except netaddr.AddrFormatError: - error = 'Value should be IPv6 format' - raise ValueError(error) - else: - return value - - -class UuidType(UserType): - """ - A simple UUID type. - - This type allows not only UUID having dashes but also UUID not - having dashes. For example, '6a0a707c-45ef-4758-b533-e55adddba8ce' - and '6a0a707c45ef4758b533e55adddba8ce' are distinguished as valid. - """ - basetype = six.string_types - name = "uuid" - - @staticmethod - def validate(value): - try: - return six.text_type((uuid.UUID(value))) - except (TypeError, ValueError, AttributeError): - error = 'Value should be UUID format' - raise ValueError(error) - - -class Enum(UserType): - """ - A simple enumeration type. Can be based on any non-complex type. - - :param basetype: The actual data type - :param values: A set of possible values - - If nullable, 'None' should be added the values set. - - Example:: - - Gender = Enum(str, 'male', 'female') - Specie = Enum(str, 'cat', 'dog') - - """ - def __init__(self, basetype, *values, **kw): - self.basetype = basetype - self.values = set(values) - name = kw.pop('name', None) - if name is None: - name = "Enum(%s)" % ', '.join((six.text_type(v) for v in values)) - self.name = name - - def validate(self, value): - if value not in self.values: - raise ValueError("Value should be one of: %s" % - ', '.join(map(six.text_type, self.values))) - return value - - def tobasetype(self, value): - return value - - def frombasetype(self, value): - return value - - -class UnsetType(object): - if sys.version < '3': - def __nonzero__(self): - return False - else: - def __bool__(self): - return False - - def __repr__(self): - return 'Unset' - - -Unset = UnsetType() - -#: A special type that corresponds to the host framework request object. -#: It can only be used in the function parameters, and if so the request object -#: of the host framework will be passed to the function. -HostRequest = object() - - -pod_types = six.integer_types + ( - bytes, text, float, bool) -dt_types = (datetime.date, datetime.time, datetime.datetime) -extra_types = (binary, decimal.Decimal) -native_types = pod_types + dt_types + extra_types -# The types for which we allow promotion to certain numbers. -_promotable_types = six.integer_types + (text, bytes) - - -def iscomplex(datatype): - return inspect.isclass(datatype) \ - and '_wsme_attributes' in datatype.__dict__ - - -def isarray(datatype): - return isinstance(datatype, ArrayType) - - -def isdict(datatype): - return isinstance(datatype, DictType) - - -def validate_value(datatype, value): - if value in (Unset, None): - return value - - # Try to promote the data type to one of our complex types. - if isinstance(datatype, list): - datatype = ArrayType(datatype[0]) - elif isinstance(datatype, dict): - datatype = DictType(*list(datatype.items())[0]) - - # If the datatype has its own validator, use that. - if hasattr(datatype, 'validate'): - return datatype.validate(value) - - # Do type promotion/conversion and data validation for builtin - # types. - v_type = type(value) - if datatype in six.integer_types: - if v_type in _promotable_types: - try: - # Try to turn the value into an int - value = datatype(value) - except ValueError: - # An error is raised at the end of the function - # when the types don't match. - pass - elif datatype is float and v_type in _promotable_types: - try: - value = float(value) - except ValueError: - # An error is raised at the end of the function - # when the types don't match. - pass - elif datatype is text and isinstance(value, bytes): - value = value.decode() - elif datatype is bytes and isinstance(value, text): - value = value.encode() - - if not isinstance(value, datatype): - raise ValueError( - "Wrong type. Expected '%s', got '%s'" % ( - datatype, v_type - )) - return value - - -class wsproperty(property): - """ - A specialised :class:`property` to define typed-property on complex types. - Example:: - - class MyComplexType(wsme.types.Base): - def get_aint(self): - return self._aint - - def set_aint(self, value): - assert avalue < 10 # Dummy input validation - self._aint = value - - aint = wsproperty(int, get_aint, set_aint, mandatory=True) - """ - def __init__(self, datatype, fget, fset=None, - mandatory=False, doc=None, name=None): - property.__init__(self, fget, fset) - #: The property name in the parent python class - self.key = None - #: The attribute name on the public of the api. - #: Defaults to :attr:`key` - self.name = name - #: property data type - self.datatype = datatype - #: True if the property is mandatory - self.mandatory = mandatory - - -class wsattr(object): - """ - Complex type attribute definition. - - Example:: - - class MyComplexType(wsme.types.Base): - optionalvalue = int - mandatoryvalue = wsattr(int, mandatory=True) - named_value = wsattr(int, name='named.value') - - After inspection, the non-wsattr attributes will be replaced, and - the above class will be equivalent to:: - - class MyComplexType(wsme.types.Base): - optionalvalue = wsattr(int) - mandatoryvalue = wsattr(int, mandatory=True) - - """ - def __init__(self, datatype, mandatory=False, name=None, default=Unset, - readonly=False): - #: The attribute name in the parent python class. - #: Set by :func:`inspect_class` - self.key = None # will be set by class inspection - #: The attribute name on the public of the api. - #: Defaults to :attr:`key` - self.name = name - self._datatype = (datatype,) - #: True if the attribute is mandatory - self.mandatory = mandatory - #: Default value. The attribute will return this instead - #: of :data:`Unset` if no value has been set. - self.default = default - #: If True value cannot be set from json/xml input data - self.readonly = readonly - - self.complextype = None - - def _get_dataholder(self, instance): - dataholder = getattr(instance, '_wsme_dataholder', None) - if dataholder is None: - dataholder = instance._wsme_DataHolderClass() - instance._wsme_dataholder = dataholder - return dataholder - - def __get__(self, instance, owner): - if instance is None: - return self - return getattr( - self._get_dataholder(instance), - self.key, - self.default - ) - - def __set__(self, instance, value): - try: - value = validate_value(self.datatype, value) - except (ValueError, TypeError) as e: - raise exc.InvalidInput(self.name, value, six.text_type(e)) - dataholder = self._get_dataholder(instance) - if value is Unset: - if hasattr(dataholder, self.key): - delattr(dataholder, self.key) - else: - setattr(dataholder, self.key, value) - - def __delete__(self, instance): - self.__set__(instance, Unset) - - def _get_datatype(self): - if isinstance(self._datatype, tuple): - self._datatype = \ - self.complextype().__registry__.resolve_type(self._datatype[0]) - if isinstance(self._datatype, weakref.ref): - return self._datatype() - if isinstance(self._datatype, list): - return [ - item() if isinstance(item, weakref.ref) else item - for item in self._datatype - ] - return self._datatype - - def _set_datatype(self, datatype): - self._datatype = datatype - - #: attribute data type. Can be either an actual type, - #: or a type name, in which case the actual type will be - #: determined when needed (generally just before scanning the api). - datatype = property(_get_datatype, _set_datatype) - - -def iswsattr(attr): - if inspect.isfunction(attr) or inspect.ismethod(attr): - return False - if isinstance(attr, property) and not isinstance(attr, wsproperty): - return False - return True - - -def sort_attributes(class_, attributes): - """Sort a class attributes list. - - 3 mechanisms are attempted : - - #. Look for a _wsme_attr_order attribute on the class_. This allow - to define an arbitrary order of the attributes (useful for - generated types). - - #. Access the object source code to find the declaration order. - - #. Sort by alphabetically""" - - if not len(attributes): - return - - attrs = dict((a.key, a) for a in attributes) - - if hasattr(class_, '_wsme_attr_order'): - names_order = class_._wsme_attr_order - else: - names = attrs.keys() - names_order = [] - try: - lines = [] - for cls in inspect.getmro(class_): - if cls is object: - continue - lines[len(lines):] = inspect.getsourcelines(cls)[0] - for line in lines: - line = line.strip().replace(" ", "") - if '=' in line: - aname = line[:line.index('=')] - if aname in names and aname not in names_order: - names_order.append(aname) - if len(names_order) < len(names): - names_order.extend(( - name for name in names if name not in names_order)) - assert len(names_order) == len(names) - except (TypeError, IOError): - names_order = list(names) - names_order.sort() - - attributes[:] = [attrs[name] for name in names_order] - - -def inspect_class(class_): - """Extract a list of (name, wsattr|wsproperty) for the given class_""" - attributes = [] - for name, attr in inspect.getmembers(class_, iswsattr): - if name.startswith('_'): - continue - if inspect.isroutine(attr): - continue - - if isinstance(attr, (wsattr, wsproperty)): - attrdef = attr - else: - if attr not in native_types and ( - inspect.isclass(attr) or - isinstance(attr, (list, dict))): - register_type(attr) - attrdef = getattr(class_, '__wsattrclass__', wsattr)(attr) - - attrdef.key = name - if attrdef.name is None: - attrdef.name = name - attrdef.complextype = weakref.ref(class_) - attributes.append(attrdef) - setattr(class_, name, attrdef) - - sort_attributes(class_, attributes) - return attributes - - -def list_attributes(class_): - """ - Returns a list of a complex type attributes. - """ - if not iscomplex(class_): - raise TypeError("%s is not a registered type") - return class_._wsme_attributes - - -def make_dataholder(class_): - # the slots are computed outside the class scope to avoid - # 'attr' to pullute the class namespace, which leads to weird - # things if one of the slots is named 'attr'. - slots = [attr.key for attr in class_._wsme_attributes] - - class DataHolder(object): - __slots__ = slots - - DataHolder.__name__ = class_.__name__ + 'DataHolder' - return DataHolder - - -class Registry(object): - def __init__(self): - self._complex_types = [] - self.array_types = set() - self.dict_types = set() - - @property - def complex_types(self): - return [t() for t in self._complex_types if t()] - - def register(self, class_): - """ - Make sure a type is registered. - - It is automatically called by :class:`expose() <wsme.expose>` - and :class:`validate() <wsme.validate>`. - Unless you want to control when the class inspection is done there - is no need to call it. - """ - if class_ is None or \ - class_ in native_types or \ - isusertype(class_) or iscomplex(class_) or \ - isarray(class_) or isdict(class_): - return class_ - - if isinstance(class_, list): - if len(class_) != 1: - raise ValueError("Cannot register type %s" % repr(class_)) - dt = ArrayType(class_[0]) - self.register(dt.item_type) - self.array_types.add(dt) - return dt - - if isinstance(class_, dict): - if len(class_) != 1: - raise ValueError("Cannot register type %s" % repr(class_)) - dt = DictType(*list(class_.items())[0]) - self.register(dt.value_type) - self.dict_types.add(dt) - return dt - - class_._wsme_attributes = None - class_._wsme_attributes = inspect_class(class_) - class_._wsme_DataHolderClass = make_dataholder(class_) - - class_.__registry__ = self - self._complex_types.append(weakref.ref(class_)) - return class_ - - def reregister(self, class_): - """Register a type which may already have been registered. - """ - self._unregister(class_) - return self.register(class_) - - def _unregister(self, class_): - """Remove a previously registered type. - """ - # Clear the existing attribute reference so it is rebuilt if - # the class is registered again later. - if hasattr(class_, '_wsme_attributes'): - del class_._wsme_attributes - # FIXME(dhellmann): This method does not recurse through the - # types like register() does. Should it? - if isinstance(class_, list): - at = ArrayType(class_[0]) - try: - self.array_types.remove(at) - except KeyError: - pass - elif isinstance(class_, dict): - key_type, value_type = list(class_.items())[0] - self.dict_types = set( - dt for dt in self.dict_types - if (dt.key_type, dt.value_type) != (key_type, value_type) - ) - # We can't use remove() here because the items in - # _complex_types are weakref objects pointing to the classes, - # so we can't compare with them directly. - self._complex_types = [ - ct for ct in self._complex_types - if ct() is not class_ - ] - - def lookup(self, typename): - log.debug('Lookup %s' % typename) - modname = None - if '.' in typename: - modname, typename = typename.rsplit('.', 1) - for ct in self._complex_types: - ct = ct() - if ct is not None and typename == ct.__name__ and ( - modname is None or modname == ct.__module__): - return ct - - def resolve_type(self, type_): - if isinstance(type_, six.string_types): - return self.lookup(type_) - if isinstance(type_, list): - type_ = ArrayType(type_[0]) - if isinstance(type_, dict): - type_ = DictType(list(type_.keys())[0], list(type_.values())[0]) - if isinstance(type_, ArrayType): - type_ = ArrayType(self.resolve_type(type_.item_type)) - self.array_types.add(type_) - elif isinstance(type_, DictType): - type_ = DictType( - type_.key_type, - self.resolve_type(type_.value_type) - ) - self.dict_types.add(type_) - else: - type_ = self.register(type_) - return type_ - - -# Default type registry -registry = Registry() - - -def register_type(class_): - return registry.register(class_) - - -class BaseMeta(type): - def __new__(cls, name, bases, dct): - if bases and bases[0] is not object and '__registry__' not in dct: - dct['__registry__'] = registry - return type.__new__(cls, name, bases, dct) - - def __init__(cls, name, bases, dct): - if bases and bases[0] is not object and cls.__registry__: - cls.__registry__.register(cls) - - -class Base(six.with_metaclass(BaseMeta)): - """Base type for complex types""" - def __init__(self, **kw): - for key, value in kw.items(): - if hasattr(self, key): - setattr(self, key, value) - - -class File(Base): - """A complex type that represents a file. - - In the particular case of protocol accepting form encoded data as - input, File can be loaded from a form file field. - """ - #: The file name - filename = wsattr(text) - - #: Mime type of the content - contenttype = wsattr(text) - - def _get_content(self): - if self._content is None and self._file: - self._content = self._file.read() - return self._content - - def _set_content(self, value): - self._content = value - self._file = None - - #: File content - content = wsproperty(binary, _get_content, _set_content) - - def __init__(self, filename=None, file=None, content=None, - contenttype=None, fieldstorage=None): - self.filename = filename - self.contenttype = contenttype - self._file = file - self._content = content - - if fieldstorage is not None: - if fieldstorage.file: - self._file = fieldstorage.file - self.filename = fieldstorage.filename - self.contenttype = text(fieldstorage.type) - else: - self._content = fieldstorage.value - - @property - def file(self): - if self._file is None and self._content: - self._file = six.BytesIO(self._content) - return self._file - - -class DynamicBase(Base): - """Base type for complex types for which all attributes are not - defined when the class is constructed. - - This class is meant to be used as a base for types that have - properties added after the main class is created, such as by - loading plugins. - - """ - - @classmethod - def add_attributes(cls, **attrs): - """Add more attributes - - The arguments should be valid Python attribute names - associated with a type for the new attribute. - - """ - for n, t in attrs.items(): - setattr(cls, n, t) - cls.__registry__.reregister(cls) |