summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing/exclusions.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/testing/exclusions.py')
-rw-r--r--lib/sqlalchemy/testing/exclusions.py269
1 files changed, 269 insertions, 0 deletions
diff --git a/lib/sqlalchemy/testing/exclusions.py b/lib/sqlalchemy/testing/exclusions.py
new file mode 100644
index 000000000..ba2eebe4f
--- /dev/null
+++ b/lib/sqlalchemy/testing/exclusions.py
@@ -0,0 +1,269 @@
+import operator
+from nose import SkipTest
+from sqlalchemy.util import decorator
+from . import config
+from sqlalchemy import util
+
+
+def fails_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+
+ @decorator
+ def decorate(fn, *args, **kw):
+ if not predicate():
+ return fn(*args, **kw)
+ else:
+ try:
+ fn(*args, **kw)
+ except Exception, ex:
+ print ("'%s' failed as expected (%s): %s " % (
+ fn.__name__, predicate, str(ex)))
+ return True
+ else:
+ raise AssertionError(
+ "Unexpected success for '%s' (%s)" %
+ (fn.__name__, predicate))
+ return decorate
+
+def skip_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+
+ @decorator
+ def decorate(fn, *args, **kw):
+ if predicate():
+ if reason:
+ msg = "'%s' : %s" % (
+ fn.__name__,
+ reason
+ )
+ else:
+ msg = "'%s': %s" % (
+ fn.__name__, predicate
+ )
+ raise SkipTest(msg)
+ else:
+ return fn(*args, **kw)
+ return decorate
+
+def only_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+ return skip_if(NotPredicate(predicate), reason)
+
+def succeeds_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+ return fails_if(NotPredicate(predicate), reason)
+
+class Predicate(object):
+ @classmethod
+ def as_predicate(cls, predicate):
+ if isinstance(predicate, Predicate):
+ return predicate
+ elif isinstance(predicate, list):
+ return OrPredicate([cls.as_predicate(pred) for pred in predicate])
+ elif isinstance(predicate, tuple):
+ return SpecPredicate(*predicate)
+ elif isinstance(predicate, basestring):
+ return SpecPredicate(predicate, None, None)
+ elif util.callable(predicate):
+ return LambdaPredicate(predicate)
+ else:
+ assert False, "unknown predicate type: %s" % predicate
+
+class SpecPredicate(Predicate):
+ def __init__(self, db, op=None, spec=None, description=None):
+ self.db = db
+ self.op = op
+ self.spec = spec
+ self.description = description
+
+ _ops = {
+ '<': operator.lt,
+ '>': operator.gt,
+ '==': operator.eq,
+ '!=': operator.ne,
+ '<=': operator.le,
+ '>=': operator.ge,
+ 'in': operator.contains,
+ 'between': lambda val, pair: val >= pair[0] and val <= pair[1],
+ }
+
+ def __call__(self, engine=None):
+ if engine is None:
+ engine = config.db
+
+ if "+" in self.db:
+ dialect, driver = self.db.split('+')
+ else:
+ dialect, driver = self.db, None
+
+ if dialect and engine.name != dialect:
+ return False
+ if driver is not None and engine.driver != driver:
+ return False
+
+ if self.op is not None:
+ assert driver is None, "DBAPI version specs not supported yet"
+
+ version = _server_version(engine)
+ oper = hasattr(self.op, '__call__') and self.op \
+ or self._ops[self.op]
+ return oper(version, self.spec)
+ else:
+ return True
+
+ def _as_string(self, negate=False):
+ if self.description is not None:
+ return self.description
+ elif self.op is None:
+ if negate:
+ return "not %s" % self.db
+ else:
+ return "%s" % self.db
+ else:
+ if negate:
+ return "not %s %s %s" % (
+ self.db,
+ self.op,
+ self.spec
+ )
+ else:
+ return "%s %s %s" % (
+ self.db,
+ self.op,
+ self.spec
+ )
+
+ def __str__(self):
+ return self._as_string()
+
+class LambdaPredicate(Predicate):
+ def __init__(self, lambda_, description=None, args=None, kw=None):
+ self.lambda_ = lambda_
+ self.args = args or ()
+ self.kw = kw or {}
+ if description:
+ self.description = description
+ elif lambda_.__doc__:
+ self.description = lambda_.__doc__
+ else:
+ self.description = "custom function"
+
+ def __call__(self):
+ return self.lambda_(*self.args, **self.kw)
+
+ def _as_string(self, negate=False):
+ if negate:
+ return "not " + self.description
+ else:
+ return self.description
+
+ def __str__(self):
+ return self._as_string()
+
+class NotPredicate(Predicate):
+ def __init__(self, predicate):
+ self.predicate = predicate
+
+ def __call__(self, *arg, **kw):
+ return not self.predicate(*arg, **kw)
+
+ def __str__(self):
+ return self.predicate._as_string(True)
+
+class OrPredicate(Predicate):
+ def __init__(self, predicates, description=None):
+ self.predicates = predicates
+ self.description = description
+
+ def __call__(self, *arg, **kw):
+ for pred in self.predicates:
+ if pred(*arg, **kw):
+ self._str = pred
+ return True
+ return False
+
+ _str = None
+
+ def _eval_str(self, negate=False):
+ if self._str is None:
+ if negate:
+ conjunction = " and "
+ else:
+ conjunction = " or "
+ return conjunction.join(p._as_string(negate=negate)
+ for p in self.predicates)
+ else:
+ return self._str._as_string(negate=negate)
+
+ def _negation_str(self):
+ if self.description is not None:
+ return "Not " + (self.description % {"spec": self._str})
+ else:
+ return self._eval_str(negate=True)
+
+ def _as_string(self, negate=False):
+ if negate:
+ return self._negation_str()
+ else:
+ if self.description is not None:
+ return self.description % {"spec": self._str}
+ else:
+ return self._eval_str()
+
+ def __str__(self):
+ return self._as_string()
+
+_as_predicate = Predicate.as_predicate
+
+def _is_excluded(db, op, spec):
+ return SpecPredicate(db, op, spec)()
+
+def _server_version(engine):
+ """Return a server_version_info tuple."""
+
+ # force metadata to be retrieved
+ conn = engine.connect()
+ version = getattr(engine.dialect, 'server_version_info', ())
+ conn.close()
+ return version
+
+def db_spec(*dbs):
+ return OrPredicate(
+ Predicate.as_predicate(db) for db in dbs
+ )
+
+def open(fn):
+ return fn
+
+@decorator
+def future(fn, *args, **kw):
+ return fails_if(LambdaPredicate(fn, *args, **kw), "Future feature")
+
+def fails_on(db, reason):
+ return fails_if(SpecPredicate(db), reason)
+
+def fails_on_everything_except(*dbs):
+ return succeeds_if(
+ OrPredicate([
+ SpecPredicate(db) for db in dbs
+ ])
+ )
+
+def skip(db, reason):
+ return skip_if(SpecPredicate(db), reason)
+
+def only_on(dbs, reason):
+ return only_if(
+ OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)])
+ )
+
+
+def exclude(db, op, spec, reason):
+ return skip_if(SpecPredicate(db, op, spec), reason)
+
+
+def against(*queries):
+ return OrPredicate([
+ Predicate.as_predicate(query)
+ for query in queries
+ ])()