diff options
Diffstat (limited to 'lib/sqlalchemy')
| -rw-r--r-- | lib/sqlalchemy/util/compat.py | 281 |
1 files changed, 138 insertions, 143 deletions
diff --git a/lib/sqlalchemy/util/compat.py b/lib/sqlalchemy/util/compat.py index 02be2b3b9..b01471edf 100644 --- a/lib/sqlalchemy/util/compat.py +++ b/lib/sqlalchemy/util/compat.py @@ -7,13 +7,11 @@ """Handle Python version/platform incompatibilities.""" +import collections +import contextlib import sys -from contextlib import contextmanager - -try: - import threading -except ImportError: - import dummy_threading as threading +import operator +import time py36 = sys.version_info >= (3, 6) py33 = sys.version_info >= (3, 3) @@ -27,16 +25,19 @@ pypy = hasattr(sys, 'pypy_version_info') win32 = sys.platform.startswith('win') cpython = not pypy and not jython # TODO: something better for this ? -import collections +contextmanager = contextlib.contextmanager +dottedgetter = operator.attrgetter +namedtuple = collections.namedtuple next = next -if py3k: - import pickle -else: - try: - import cPickle as pickle - except ImportError: - import pickle +ArgSpec = collections.namedtuple("ArgSpec", + ["args", "varargs", "keywords", "defaults"]) + +try: + import threading +except ImportError: + import dummy_threading as threading + # work around http://bugs.python.org/issue2646 if py265: @@ -44,24 +45,20 @@ if py265: else: safe_kwarg = str -ArgSpec = collections.namedtuple("ArgSpec", - ["args", "varargs", "keywords", "defaults"]) if py3k: + import base64 import builtins - - from inspect import getfullargspec as inspect_getfullargspec - from urllib.parse import (quote_plus, unquote_plus, - parse_qsl, quote, unquote) import configparser - from io import StringIO + import itertools + import pickle + from functools import reduce + from inspect import getfullargspec as inspect_getfullargspec from io import BytesIO as byte_buffer - - def inspect_getargspec(func): - return ArgSpec( - *inspect_getfullargspec(func)[0:4] - ) + from io import StringIO + from itertools import zip_longest + from urllib.parse import (quote_plus, unquote_plus, parse_qsl, quote, unquote) string_types = str, binary_types = bytes, @@ -70,52 +67,67 @@ if py3k: int_types = int, iterbytes = iter - def u(s): - return s + itertools_filterfalse = itertools.filterfalse + itertools_filter = filter + itertools_imap = map - def ue(s): - return s + exec_ = getattr(builtins, 'exec') + import_ = getattr(builtins, '__import__') + print_ = getattr(builtins, "print") def b(s): return s.encode("latin-1") - if py32: - callable = callable - else: - def callable(fn): - return hasattr(fn, '__call__') + def b64decode(x): + return base64.b64decode(x.encode('ascii')) + + + def b64encode(x): + return base64.b64encode(x).decode('ascii') def cmp(a, b): return (a > b) - (a < b) - from functools import reduce + def inspect_getargspec(func): + return ArgSpec( + *inspect_getfullargspec(func)[0:4] + ) - print_ = getattr(builtins, "print") + def reraise(tp, value, tb=None, cause=None): + if cause is not None: + assert cause is not value, "Same cause emitted" + value.__cause__ = cause + if value.__traceback__ is not tb: + raise value.with_traceback(tb) + raise value - import_ = getattr(builtins, '__import__') + def u(s): + return s - import itertools - itertools_filterfalse = itertools.filterfalse - itertools_filter = filter - itertools_imap = map - from itertools import zip_longest + def ue(s): + return s + if py32: + callable = callable + else: + def callable(fn): + return hasattr(fn, '__call__') +else: import base64 + import ConfigParser as configparser + import itertools - def b64encode(x): - return base64.b64encode(x).decode('ascii') - - def b64decode(x): - return base64.b64decode(x.encode('ascii')) - -else: + from StringIO import StringIO + from cStringIO import StringIO as byte_buffer from inspect import getargspec as inspect_getfullargspec - inspect_getargspec = inspect_getfullargspec + from itertools import izip_longest as zip_longest from urllib import quote_plus, unquote_plus, quote, unquote from urlparse import parse_qsl - import ConfigParser as configparser - from StringIO import StringIO - from cStringIO import StringIO as byte_buffer + + try: + import cPickle as pickle + except ImportError: + import pickle string_types = basestring, binary_types = bytes, @@ -123,35 +135,36 @@ else: text_type = unicode int_types = int, long - def iterbytes(buf): - return (ord(byte) for byte in buf) + inspect_getargspec = inspect_getfullargspec - def u(s): - # this differs from what six does, which doesn't support non-ASCII - # strings - we only use u() with - # literal source strings, and all our source files with non-ascii - # in them (all are tests) are utf-8 encoded. - return unicode(s, "utf-8") + callable = callable + cmp = cmp + reduce = reduce - def ue(s): - return unicode(s, "unicode_escape") + b64encode = base64.b64encode + b64decode = base64.b64decode + + itertools_filterfalse = itertools.ifilterfalse + itertools_filter = itertools.ifilter + itertools_imap = itertools.imap def b(s): return s + def exec_(func_text, globals_, lcl=None): + if lcl is None: + exec('exec func_text in globals_') + else: + exec('exec func_text in globals_, lcl') + + def iterbytes(buf): + return (ord(byte) for byte in buf) + def import_(*args): if len(args) == 4: args = args[0:3] + ([str(arg) for arg in args[3]],) return __import__(*args) - callable = callable - cmp = cmp - reduce = reduce - - import base64 - b64encode = base64.b64encode - b64decode = base64.b64decode - def print_(*args, **kwargs): fp = kwargs.pop("file", sys.stdout) if fp is None: @@ -161,11 +174,23 @@ else: arg = str(arg) fp.write(arg) - import itertools - itertools_filterfalse = itertools.ifilterfalse - itertools_filter = itertools.ifilter - itertools_imap = itertools.imap - from itertools import izip_longest as zip_longest + def u(s): + # this differs from what six does, which doesn't support non-ASCII + # strings - we only use u() with + # literal source strings, and all our source files with non-ascii + # in them (all are tests) are utf-8 encoded. + return unicode(s, "utf-8") + + def ue(s): + return unicode(s, "unicode_escape") + + # not as nice as that of Py3K, but at least preserves + # the code line where the issue occurred + exec("def reraise(tp, value, tb=None, cause=None):\n" + " if cause is not None:\n" + " assert cause is not value, 'Same cause emitted'\n" + " raise tp, value, tb\n") + if py35: from inspect import formatannotation @@ -198,6 +223,7 @@ if py35: if arg in annotations: result += ': ' + formatannotation(annotations[arg]) return result + specs = [] if defaults: firstdefault = len(args) - len(defaults) @@ -206,95 +232,45 @@ if py35: if defaults and i >= firstdefault: spec = spec + formatvalue(defaults[i - firstdefault]) specs.append(spec) + if varargs is not None: specs.append(formatvarargs(formatargandannotation(varargs))) else: if kwonlyargs: specs.append('*') + if kwonlyargs: for kwonlyarg in kwonlyargs: spec = formatargandannotation(kwonlyarg) if kwonlydefaults and kwonlyarg in kwonlydefaults: spec += formatvalue(kwonlydefaults[kwonlyarg]) specs.append(spec) + if varkw is not None: specs.append(formatvarkw(formatargandannotation(varkw))) + result = '(' + ', '.join(specs) + ')' if 'return' in annotations: result += formatreturns(formatannotation(annotations['return'])) return result - else: from inspect import formatargspec as inspect_formatargspec - -import time if win32 or jython: time_func = time.clock else: time_func = time.time -from collections import namedtuple -from operator import attrgetter as dottedgetter - - -if py3k: - def reraise(tp, value, tb=None, cause=None): - if cause is not None: - assert cause is not value, "Same cause emitted" - value.__cause__ = cause - if value.__traceback__ is not tb: - raise value.with_traceback(tb) - raise value - -else: - # not as nice as that of Py3K, but at least preserves - # the code line where the issue occurred - exec("def reraise(tp, value, tb=None, cause=None):\n" - " if cause is not None:\n" - " assert cause is not value, 'Same cause emitted'\n" - " raise tp, value, tb\n") - - -def raise_from_cause(exception, exc_info=None): - if exc_info is None: - exc_info = sys.exc_info() - exc_type, exc_value, exc_tb = exc_info - cause = exc_value if exc_value is not exception else None - reraise(type(exception), exception, tb=exc_tb, cause=cause) - -if py3k: - exec_ = getattr(builtins, 'exec') +# Fix deprecation of accessing ABCs straight from collections module +# (which will stop working in 3.8). +if py33: + import collections.abc as collections_abc else: - def exec_(func_text, globals_, lcl=None): - if lcl is None: - exec('exec func_text in globals_') - else: - exec('exec func_text in globals_, lcl') - - -def with_metaclass(meta, *bases): - """Create a base class with a metaclass. - - Drops the middle class upon creation. - - Source: http://lucumr.pocoo.org/2013/5/21/porting-to-python-3-redux/ - - """ - - class metaclass(meta): - __call__ = type.__call__ - __init__ = type.__init__ - - def __new__(cls, name, this_bases, d): - if this_bases is None: - return type.__new__(cls, name, (), d) - return meta(name, bases, d) - return metaclass('temporary_class', None, {}) + import collections as collections_abc -@contextmanager +@contextlib.contextmanager def nested(*managers): """Implement contextlib.nested, mostly for unit tests. @@ -329,10 +305,29 @@ def nested(*managers): reraise(exc[0], exc[1], exc[2]) -# Fix deprecation of accessing ABCs straight from collections module -# (which will stop working in 3.8). -if py33: - import collections.abc as collections_abc -else: - import collections as collections_abc +def raise_from_cause(exception, exc_info=None): + if exc_info is None: + exc_info = sys.exc_info() + exc_type, exc_value, exc_tb = exc_info + cause = exc_value if exc_value is not exception else None + reraise(type(exception), exception, tb=exc_tb, cause=cause) + +def with_metaclass(meta, *bases): + """Create a base class with a metaclass. + + Drops the middle class upon creation. + + Source: http://lucumr.pocoo.org/2013/5/21/porting-to-python-3-redux/ + + """ + + class metaclass(meta): + __call__ = type.__call__ + __init__ = type.__init__ + + def __new__(cls, name, this_bases, d): + if this_bases is None: + return type.__new__(cls, name, (), d) + return meta(name, bases, d) + return metaclass('temporary_class', None, {}) |
